在kafka0.9版本之前消費者儲存的偏移量預設是在zookeeper中/consumers/group.id/offsets/topic.name/partition.id。也可以儲存到mysql等其他工具中。0.9之後消費者不在儲存偏移量到zookeeper中,而是kafka本身管理消費者的偏移量,kafka將偏移量儲存在乙個內部主題中「__consumer_offsets」,該主題預設有50個分割槽,每個分割槽3個副本,分割槽數量有引數offset.topic.num.partition設定。通過消費者組id的雜湊值和該引數取模的方式來確定某個消費者組已消費的偏移量儲存到__consumer_offsets主題的哪個分割槽中。
kafka消費者api提供兩種方法用來查詢偏移量。乙個是committed(topicpartition partition)方法,這個方法返回乙個offsetandmetadata物件,通過這個物件可以獲取指定分割槽已提交的偏移量;另外乙個方法position(topicpartition partition)返回的是下一次拉取位置。
同時kafka消費者還提供了重置消費偏移量的方法,seek(topicpartition partition, long offset),該方法用於指定消費起始位置,另外還有seektobeginning()和seektoend(),意思就是從頭消費和從最後消費。
偏移量提交有自動和手動,預設是自動(enable.auto.commit = true)。自動提交的話每隔多久自動提交一次呢?這個由消費者協調器引數auto.commit.interval.ms 毫秒執行一次提交。有些場景我們需要手動提交偏移量,尤其是在乙個長事務中並且保證訊息不被重複消費以及訊息不丟失,比如生產者乙個訂單提交訊息,消費者拿到後要扣減庫存,扣減成功後該訊息才能提交,所以在這種場景下需要手動提交,因為庫存扣減失敗這個訊息就不能消費,同時客戶這個訂單狀態也不能是成功。手動提交也有兩種乙個是同步提交乙個是非同步提交,其區別就是消費者執行緒是否阻塞。如果使用手動提交就要關閉自動提交,因為自動提交預設是開啟的。
基於Kafka 0 9版本 使用ACL進行許可權控制
kafka附帶乙個可插拔的認證,並使用zookeeper來儲存所有的acl。kafka的acl在一般格式定義 principal p is allowed denied operation o from host h on resource r 你可以閱讀更多關於kip 11的結構,為了新增,刪除或...
kafka檢視消費組未消費資料情況0 9之後版本
bin kafka topics.sh zookeeper 127.0.0.1 2181 list 檢視topic bin kafka consumer groups.sh new consumer bootstrap server localhost 9092 list 檢視消費組 bin kaf...
09之 interrupt 和執行緒終止方式
interrupt 是給執行緒設定中斷標誌 interrupted 是檢測中斷並清除中斷狀態 isinterrupted 只檢測中斷。還有重要的一點就是interrupted 作用於當前執行緒,interrupt 和isinterrupted 作用於此執行緒,即 中呼叫此方法的例項所代表的執行緒。首...