多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。
1.首先修改server.properties裡:
num.partitions=3
這裡等於3是因為本人的集群是用了三颱機子,也就是3個broker,所以設定成3,具體數值可以根據集群情況設定。
2.建立topic:
bin/kafka-topics.sh –create –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –replication-factor 3 –partitions 3 –topic test
這裡的3對應上面的配置裡的num.partitions=3
3.檢視topic資訊:
./kafka-topics.sh –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –topic test–describe
會有如下顯示表示建立成功:
topic:test partitioncount:3 replicationfactor:3 configs:
topic: test partition: 0 leader: 2 replicas: 2,0,1 isr: 0,1,2
topic: test partition: 1 leader: 0 replicas: 0,1,2 isr: 0,1,2
topic: test partition: 2 leader: 1 replicas: 1,2,0 isr: 0,1,2
二:生產者隨機分割槽提交資料
這也是乙個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費;
這裡借用了一片**,自定義隨機分割槽:
public class mypartition implements partitioner
@override
public void close()
@override
public int partition(string topic, object key, byte keybytes, object value,
byte valuebytes, cluster cluster) catch (exception e)
// system.out.println("kafkamessage topic:"+ topic+" |key:"+ key+" |value:"+value);
return math.abs(partitionnum % numpartitions);}}
1234
5678
9101112
1314
1516
1718
1920
2122
2324
2526
27然後在初始化kafka生產者配置的時候修改如下配置
props.put("partitioner.class", properties.getproperty("com.mykafka.mypartition"));
這樣就實現了kafka生產者隨機分割槽提交資料。
三:消費者
最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:
executorservice fixedthreadpool = executors.newfixedthreadpool(3);
for (int i = 0; i < 3; i++)
});}12
3456
789在消費方面得注意,這裡得遍歷所有分割槽,否則還是只消費了乙個區:
consumerrecordsrecords = consumer.poll(1000);
for (topicpartition partition : records.partitions()) else }}
1234
5678
9101112
1314
1516
17
rdkafka執行緒過多 kafka 多執行緒消費
一 1 kafka的消費並行度依賴topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費 每台機器只能開啟乙個執行緒 或者一台機器消費 10個執行緒並行消費 即消費並行度和分割槽數一致。2 1 如果指定了某個分割槽,會只講訊息發到這個分割槽上 2 如果同時指定了某個分割槽和key,...
kafka系列 多執行緒消費者實現
看了一下kafka,然後寫了消費kafka資料的 感覺自己功力還是不夠。不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。不能很好地組織 結構,設計模式沒學好,物件導向思想理解不夠成熟。用佇列來儲存要消費的資料。用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。...
在 DOS 下實現多執行緒
在 dos 下實現多執行緒 程式在 turbo c 3.0 下除錯通過,採用了最簡單的時間片輪轉法,實現了多執行緒的系統,程式盡量採用了最簡潔的技術來實現多工的系統,主要使用到了c標準庫中的setjmp和longjmp兩個函式,程式絕大部分都是採用c c 語言書寫,但是仍然不可避免的採用了三句內嵌彙...