一、
1、kafka的消費並行度依賴topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費(每台機器只能開啟乙個執行緒),或者一台機器消費(10個執行緒並行消費)。即消費並行度和分割槽數一致。
2、(1)如果指定了某個分割槽,會只講訊息發到這個分割槽上
(2)如果同時指定了某個分割槽和key,則也會將訊息傳送到指定分割槽上,key不起作用
(3)如果沒有指定分割槽和key,那麼將會隨機傳送到topic的分割槽中
(4)如果指定了key,那麼將會以hash的方式傳送到分割槽中
二、多執行緒消費例項
paritition 為3,broker為3,節點為3
1、生產者隨機分割槽提交資料
這也是乙個比較關鍵步驟,只有隨機提交到不同的分割槽才能實現多分割槽消費;
自定義隨機分割槽:
public class mypartition implements partitioner@override
public void close()
@override
public int partition(string topic, object key, byte keybytes, object value,
byte valuebytes, cluster cluster) catch (exception e)
// system.out.println("kafkamessage topic:"+ topic+" |key:"+ key+" |value:"+value);
return math.abs(partitionnum % numpartitions);
}}
然後在初始化kafka生產者配置的時候修改如下配置
props.put("partitioner.class", properties.getproperty("com.mykafka.mypartition"));
這樣就實現了kafka生產者隨機分割槽提交資料。
2、消費者
最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,本例知識用了最簡單的固定執行緒模式:
executorservice fixedthreadpool = executors.newfixedthreadpool(3);for (int i = 0; i < 3; i++)
});}
在消費方面得注意,這裡得遍歷所有分割槽,否則還是只消費了乙個區:
consumerrecordsrecords = consumer.poll(1000);for (topicpartition partition : records.partitions()) else
}}
注意上面的執行緒為啥只有3個,這裡得跟上面kafka的分割槽個數相對應起來,否則如果執行緒超過分割槽數量,那麼只會浪費執行緒,因為即使使用3個以上的執行緒也只會消費三個分割槽,而少了則無法消費完全。所以這裡必須更上面的對應起來。
kafka多執行緒消費topic的問題
案例 topic my topic,分割槽 6 消費者 部署三颱機器,每台機器上面開啟6個執行緒消費。消費結果 只有一台機器可以正常消費,另外兩台機器直接輸出六條告警日誌 no broker partitions consumed by consumer thread my topic group ...
集群下的kafka實現多執行緒消費
多執行緒消費,說白了就是多區消費,kafka可以給topic設定多個partition,從而實現生產的時候提交到不同的分割槽,以減少統一區塊的壓力。而消費則是從不同的分割槽裡拿資料進行消費。1.首先修改server.properties裡 num.partitions 3 這裡等於3是因為本人的集群...
kafka系列 多執行緒消費者實現
看了一下kafka,然後寫了消費kafka資料的 感覺自己功力還是不夠。不能隨心所欲地運算元據,資料結構沒學好,spark的rdd操作沒學好。不能很好地組織 結構,設計模式沒學好,物件導向思想理解不夠成熟。用佇列來儲存要消費的資料。用佇列來儲存要提交的offest,然後處理執行緒將其給回消費者提交。...