從kafka消費訊息,kafka客戶端提供兩種模式: 分割槽消費,分組消費。
分割槽消費對應的就是我們的directkafkainputdstream
分組消費對應的就是我們的kafkainputdstream
消費者數目跟分割槽數目的關係:
1),乙個消費者可以消費乙個到全部分割槽資料
2),分組消費,同乙個分組內所有消費者消費乙份完整的資料,此時乙個分割槽資料只能被乙個消費者消費,而乙個消費者可以消費多個分割槽資料
3),同乙個消費組內,消費者數目大於分割槽數目後,消費者會有空餘=分割槽數-消費者數
二,分組消費的再平衡策略
當乙個group中,有consumer加入或者離開時,會觸發partitions均衡partition.assignment.strategy,決定了partition分配給消費者的分配策略,有兩種分配策略:
1,org.apache.kafka.clients.consumer.rangeassignor
預設採用的是這種再平衡方式,這種方式分配只是針對消費者訂閱的topic的單個topic所有分割槽再分配,consumer rebalance的演算法如下:
1),將目標topic下的所有partirtion排序,存於tp
2),對某consumer group下所有consumer按照名字根據字典排序,存於cg,第i個consumer記為ci
3),n=size(tp)/size(cg)
4),r=size(tp)%size(cg)
5),ci獲取的分割槽起始位置=n*i+min(i,r)
6),ci獲取的分割槽總數=n+(if (i+ 1 > r) 0 else 1)
2,org.apache.kafka.clients.consumer.roundrobinassignor
這種分配策略是針對消費者消費的所有topic的所有分割槽進行分配。當有新的消費者加入或者有消費者退出,就會觸發rebalance。這種方式有兩點要求
a),在例項化每個消費者時給每個topic指定相同的流數
b),每個消費者例項訂閱的topic必須相同
maptopiccountmap = new hashmap();
topiccountmap.put(topic, new integer(1));
map>> consumermap = consumer.createmessagestreams(topiccountmap);
其中,topic對應的value就是流數目。對應的kafka原始碼是在
在kafka.consumer.zookeeperconsumerconnector的consume方法裡,根據這個引數構建了相同數目的kafkastream。
這種策略的具體分配步驟:
1),對所有topic的所有分割槽按照topic+partition轉string之後的hash進行排序
2),對消費者按字典進行排序
3),然後輪訓的方式將分割槽分配給消費者
3,舉例對比
舉個例子,比如有兩個消費者(c0,c1),兩個topic(t0,t1),每個topic有三個分割槽p(0-2),
那麼採用rangeassignor,結果為:
* c0: [t0p0, t0p1, t1p0, t1p1]
* c1: [t0p2, t1p2]
採用roundrobinassignor,結果為:
* c0: [t0p0, t0p2, t1p1]
* c1: [t0p1, t1p0, t1p2]
工作 學習和生活的再平衡
經過一段時間的努力磨合,慢慢的工作,學習和生活又處於再平衡狀態了,記錄下一些思考就當做是總結吧。當下 是佛經裡面最小的時間單位,1秒鐘有3600個當下,把時間切到很小很小的單位。那什麼又是活在當下呢?我理解就是去除雜念,吃飯的時候體會咀嚼和吞嚥的過程,睡覺的時候體會身體逐漸失去意識,書法繪畫體會筆尖...
kafka的leader選舉過程 再平衡
afka在所有broker中選出乙個controller,所有partition的leader選舉都由controller決定。controller會將leader的改變直接通過rpc的方式 比zookeeper queue的方式更高效 通知需為此作出響應的broker。同時controller也負...
Kafka消費分組和分割槽分配策略
kafka消費分組,訊息消費原理 同乙個消費組裡的消費者不能消費同乙個分割槽,不同消費組的消費組可以消費同乙個分割槽 kafka分割槽分配策略 在 kafka 內部存在兩種預設的分割槽分配策略 range 和 roundrobin。當以下事件發生時,kafka 將會進行一次分割槽分配 將分割槽的所有...