集群下的kafka實現多執行緒消費

2021-09-23 06:19:23 字數 2073 閱讀 6601

多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。

1.首先修改server.properties裡:

num.partitions=3

這裡等於3是因為本人的集群是用了三颱機子,也就是3個broker,所以設定成3,具體數值可以根據集群情況設定。

2.建立topic:

bin/kafka-topics.sh –create –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –replication-factor 3 –partitions 3 –topic test

這裡的3對應上面的配置裡的num.partitions=3

3.檢視topic資訊:

./kafka-topics.sh –zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 –topic test–describe

會有如下顯示表示建立成功:

topic:test partitioncount:3 replicationfactor:3 configs:

topic: test partition: 0 leader: 2 replicas: 2,0,1 isr: 0,1,2

topic: test partition: 1 leader: 0 replicas: 0,1,2 isr: 0,1,2

topic: test partition: 2 leader: 1 replicas: 1,2,0 isr: 0,1,2

二:生產者隨機分割槽提交資料

這也是乙個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費;

這裡借用了一片**,自定義隨機分割槽:

public class mypartition implements partitioner

@override

public void close()

@override

public int partition(string topic, object key, byte keybytes, object value,

byte valuebytes, cluster cluster) catch (exception e)

//        system.out.println("kafkamessage topic:"+ topic+" |key:"+ key+" |value:"+value);

return math.abs(partitionnum  % numpartitions);}}

1234

5678

9101112

1314

1516

1718

1920

2122

2324

2526

27然後在初始化kafka生產者配置的時候修改如下配置

props.put("partitioner.class", properties.getproperty("com.mykafka.mypartition"));

這樣就實現了kafka生產者隨機分割槽提交資料。

三:消費者

最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:

executorservice fixedthreadpool = executors.newfixedthreadpool(3);

for (int i = 0; i < 3; i++)

});}12

3456

789在消費方面得注意,這裡得遍歷所有分割槽,否則還是只消費了乙個區:

consumerrecordsrecords = consumer.poll(1000);

for (topicpartition partition : records.partitions()) else }}

1234

5678

9101112

1314

1516

17

rdkafka執行緒過多 kafka 多執行緒消費

一 1 kafka的消費並行度依賴topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費 每台機器只能開啟乙個執行緒 或者一台機器消費 10個執行緒並行消費 即消費並行度和分割槽數一致。2 1 如果指定了某個分割槽,會只講訊息發到這個分割槽上 2 如果同時指定了某個分割槽和key,...

kafka系列 多執行緒消費者實現

看了一下kafka,然後寫了消費kafka資料的 感覺自己功力還是不夠。不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。不能很好地組織 結構,設計模式沒學好,物件導向思想理解不夠成熟。用佇列來儲存要消費的資料。用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。...

在 DOS 下實現多執行緒

在 dos 下實現多執行緒 程式在 turbo c 3.0 下除錯通過,採用了最簡單的時間片輪轉法,實現了多執行緒的系統,程式盡量採用了最簡潔的技術來實現多工的系統,主要使用到了c標準庫中的setjmp和longjmp兩個函式,程式絕大部分都是採用c c 語言書寫,但是仍然不可避免的採用了三句內嵌彙...