在高階api
中,消費者要從頭開始消費某個topic
的全量資料,需要滿足2個條件:
(1)使用乙個全新的"group.id"(就是之前沒有被任何消費者使用過);
(2)使用assign來訂閱;
注意:如果把"enable.auto.commit" 設為 "false"
,使用consumer.commitasync(currentoffsets, null)
手動提交offset
,是不能從頭開始消費的
auto.offset.reset值含**釋
earliest也就是說無論哪種設定,只要當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
latest
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
none
topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有乙個分割槽不存在已提交的offset,則丟擲異常
kafka
中相同group
、partition
中已經有提交的offset
,則都無法從開始消費。
參考論壇:伺服器重啟了,那麼該group是否會重新消費伺服器裡面所有的訊息
kafkaconsumer.subscribe() : 為consumer自動分配partition,有內部演算法保證topic-partition以最優的方式均勻分配給同group下的不同consumer。如果有多個partition且只有乙個消費者,則按順序消費所有分割槽。不會重複消費。
kafkaconsumer.assign() : 為consumer手動、顯示的指定需要消費的topic-partitions,不受group.id限制,不提交offset,相當與指定的group無效(this method does not use the consumer's group management)。可以重複消費。
或者,這樣做:
目前就high level api
而言,offset
是存於zookeeper
中的,無法存於hdfs
,而low level api
的offset
是由自己去維護的,可以將之存於hdfs
中。
Kafka 如何從指定時間開始消費Topic
0.10.1.1版本的kafka增加了時間索引檔案,可以根據指定的時間戳計算出的offset來訪問topicpartition中的訊息。public class timestampoffsetconsumer consumer.assign topicpartitions 獲取每個partition...
kafka重複消費問題
問題描述 採用kafka讀取訊息進行處理時,consumer會重複讀取afka佇列中的資料。問題原因 kafka的consumer消費資料時首先會從broker裡讀取一批訊息資料進行處理,處理完成後再提交offset。而我們專案中的consumer消費能力比較低,導致取出的一批資料在session....
kafka重複消費 漏消費情況
kafka重複消費的情況 資料沒有丟,只是資料重複消費了。丟不丟資料指的是producer到broker的過程,以及broker儲存資料的過程。重複消費 漏消費指的是消費結果,所以我們記憶這些過程的時候,或者定位問題的時候,首先應該明確,是丟資料了還是重複消費了。重複消費 ack 1 produce...