首先簡單說下對kafka的理解:
1、kafka是乙個分布式的訊息快取系統;
2、kafka集群中的伺服器節點都被稱作broker
3、kafka的客戶端分為:一是producer(訊息生產者)負責往訊息佇列中放入訊息;另一類是consumer(訊息消費者)負責從訊息佇列中取訊息。客戶端和伺服器之間的通訊採用tcp協議
4、kafka中不同業務系統的訊息可以通過topic(主題)進行區分,也就是說乙個主題就是乙個訊息佇列,而且每乙個訊息topic都會被分割槽,以分擔訊息讀寫的負載
5、parition(分割槽)是物理上的概念,每個topic包含乙個或多個partition,建立topic時可指定parition數量。每個partition對應於乙個資料夾,該資料夾下儲存該partition的資料和索引檔案。每乙個分割槽都可以有多個副本,以防止資料的丟失
6、某乙個分割槽中的資料如果需要更新,都必須通過該分割槽所有副本中的leader來更新
7、消費者可以分組,每乙個consumer屬於特定的組,同一topic的一條訊息只能被同乙個consumer group內的乙個consumer消費,但多個consumer group可同時消費這一訊息。比如有兩個消費者組a和b,共同消費乙個topic:topic-1,a和b所消費的訊息不會重複.
比如 topic-1中有100個訊息,每個訊息有乙個id,編號從0-99,那麼,如果a組消費0-49號,b組就消費50-99號
8、消費者在具體消費某個topic中的訊息時,可以指定起始偏移量
集群安裝、啟動
tar xf kafka_2.10-0.8.1.1.tgz
cd kafka_2.10-0.8.1.1
2、修改config/server.properties配置檔案
broker.id=1
zookeeper.connect=192.168
.2.100:2181, 192.168
.2.110:2181, 192.168
.2.120:2181
注:kafka集群依賴zookeeper集群,所以此處需要配置zookeeper集群;zookeeper集群配置請參見:
3、將kafka解壓包使用scp命令拷貝至集群其他節點,命令:
scp -r kafka_2.10-0.8.1.1/ 192.168.2.110
4、將zookeeper集群啟動,請參見:
5、在每一台節點上啟動broker
bin/kafka-server-start.sh config/server.properties
//執行在後台命令:
bin/kafka-server-start.sh config/server.properties 1>/dev/null
2>&1 &
//使用jps命令檢視是否啟動
[hadoop@hadoop1-1 kafka_2.10-0.8
.1.1]$ jps
2400 jps
2360 kafka
2289 quorumpeermain
**
**
1、在kafka集群中建立乙個topic
[
hadoop@hadoop1-1
kafka_2.10
-0.8
.1.1
]$bin/kafka
-topics.sh
--create--
zookeeper
192.
168.2.
100:2181--
replication
-factor3-
-partitions1-
-topic
topictest
created
topic
"topictest"
.
replication-factor:表示副本數量
partitions :分割槽數量
2、用乙個producer向某乙個topic中寫入訊息
[hadoop@hadoop1-1 kafka_2.10-0.8
.1.1]$ bin/kafka-console-producer.sh --broker-list 192.168
.2.100:9092 --topic topictest
slf4j: failed to load class "org.slf4j.impl.staticloggerbinder".
slf4j: defaulting to no-operation (nop) logger implementation
slf4j: see
#staticloggerbinder for further details.
3、用乙個comsumer從某乙個topic中讀取資訊
[hadoop@hadoop1-2 kafka_2.10-0.8
.1.1]$ bin/kafka-console-consumer.sh --zookeeper 192.168
.2.100:2181 --from-beginning --topic topictest
slf4j: failed to load class "org.slf4j.impl.staticloggerbinder".
slf4j: defaulting to no-operation (nop) logger implementation
slf4j: see
#staticloggerbinder for further details.
4、檢視乙個topic的分割槽及副本狀態資訊
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.110
:2181 --topic topictest
topic
:topictest
partitioncount:1
replicationfactor:3
configs
:topic
: topictest partition:0
leader:1
replicas
:1,0,2
isr:
1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.100
:2181 --topic topictest
topic
:topictest
partitioncount:1
replicationfactor:3
configs
:topic
: topictest partition:0
leader:1
replicas
:1,0,2
isr:
1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --describe --zookeeper 192.168.2.120
:2181 --topic topictest
topic
:topictest
partitioncount:1
replicationfactor:3
configs
:topic
: topictest partition:0
leader:1
replicas
:1,0,2
isr:
1,0,2
[hadoop@hadoop1-3 kafka_2.10-0.8.1.1]$
5、檢視topic
bin/kafka
-topics.sh
--list--
zookeeper
192.
168.2.
100:2181
分布式訊息佇列kafka
kafka是linkedin開源的分布式發布 訂閱訊息系統 訊息佇列 kafka特點 1 高吞吐率 低延遲,每秒處理幾十萬訊息,延遲最低幾毫秒 2 可擴充套件性,支援動態擴充套件節點資料 3 永續性與可靠性,資料被持久化磁碟,支援資料多副本防止資料丟失 4 高容錯,允許節點失敗 5 高併發,支援上千...
Kafka分布式訊息佇列
可快速持久化。通過o 1 的磁碟資料結構提供訊息的持久化,這種結構對於即使數以tb的訊息儲存也能夠保持長時間的穩定性能 高吞吐量。即使是非常普通的硬體kafka也可以支援每秒數百萬的訊息 完全的分布式系統。它的broker producer consumer都原生地支援分布式,自動支援負載均衡 pa...
Kafka分布式訊息佇列框架
既有的訊息佇列框架或者對訊息傳送的可靠性提供了較高的保證,由此帶來較大的負擔,不能滿足海量高吞吐率的要求 或者完全面向實時訊息處理系統,對於批量離線處理的場合無法提供足夠的快取和永續性要求。如何實現 kafka的集群有多個broker伺服器組成,每個型別的訊息被定義為topic,同一topic內部的...