上面兩篇聊了kafka概況和kafka生產者,包含了kafka的基本概念、設計原理、設計核心以及生產者的核心原理。本篇單獨聊聊kafka的消費者,包括如下內容:
概念kafka消費者物件訂閱主題並接收kafka的訊息,然後驗證訊息並儲存結果。
kafka消費者是消費者組的一部分。乙個消費者組裡的消費者訂閱的是同乙個主題,每個消費者接收主題一部分分割槽的訊息。
消費者組的設計是對消費者進行的乙個橫向伸縮,用於解決消費者消費資料的速度跟不上生產者生產資料的速度的問題,通過增加消費者,讓它們分擔負載,分別處理部分分割槽的訊息。
消費者數目與分割槽數目
在乙個消費者組中的消費者消費的是乙個主題的部分分割槽的訊息,而乙個主題中包含若干個分割槽,乙個消費者組中也包含著若干個消費者。當二者的數量關係處於不同的大小關係時,kafka消費者的工作狀態也是不同的。看以下三種情況:
消費者數目《分割槽數目:此時不同分割槽的訊息會被均衡地分配到這些消費者;
消費者數目=分割槽數目:每個消費者會負責乙個分割槽的訊息進行消費;
消費者數目》分割槽數目:此時會有多餘的消費者處於空閒狀態,其他的消費者與分割槽一對一地進行消費。
分割槽再均衡
當消費者數目與分割槽數目在以上三種關係間變化時,比如有新的消費者加入、或者有乙個消費者發生崩潰時,會發生分割槽再均衡。
分割槽再均衡是指分割槽的所有權從乙個消費者轉移到另乙個消費者。再均衡為消費者組帶來了高可用性和伸縮性。但是同時,也會發生如下問題:
訂閱主題
建立了kafka消費者之後,接著就可以訂閱主題了。訂閱主題可以使用如下兩個 api :
輪詢消費
1234567
891011
12
try}} finally
與生產者類似,消費者也有完整的配置列表。接下來一一介紹這些重要的屬性。
fetch.min.byte
消費者從伺服器獲取記錄的最小位元組數。如果可用的資料量小於設定值,broker 會等待有足夠的可用資料時才會把它返回給消費者。主要是為了降低消費者和broker的工作負載。
fetch.max.wait.ms
broker 返回給消費者資料的等待時間,預設是 500ms。如果消費者獲取最小資料量的要求得不到滿足,就會在等待最多該屬性所設定的時間後獲取到資料。實際要看二者哪個條件先滿足。
max.partition.fetch.bytes
該屬性指定了伺服器從每個分割槽返回給消費者的最大位元組數,預設為 1mb。
session.timeout.ms
消費者在被認為死亡之前可以與伺服器斷開連線的時間,預設是 3s。
auto.offset.reset
該屬性指定了消費者在讀取乙個沒有偏移量的分割槽或者偏移量無效的情況下該作何處理:
如果某個提交失敗,同步提交還會進行重試,這可以保證資料能夠最大限度提交成功,但是同時也會降低程式的吞吐量。
非同步提交
為了解決同步提交降低程式吞吐量的問題,又有了非同步提交的方案。
非同步提交可以提高程式的吞吐量,因為此時你可以儘管請求資料,而不用等待 broker 的響應。**樣例如下:
1234567
891011
12
while (true)}});
}
非同步提交如果失敗,錯誤資訊和偏移量都會被記錄下來。
儘管如此,非同步提交存在的問題是,如果提交失敗不能重試,因為重試可能會出現小偏移量覆蓋大偏移量的問題。
雖然程式不能在失敗時候進行自動重試,但是我們是可以手動進行重試。可以通過乙個 mapoffsets 來維護你提交的每個分割槽的偏移量,也就是非同步提交的順序,在每次提交偏移量之後或在**裡提交偏移量時遞增序列號。然後當失敗時候,你可以判斷失敗的偏移量是否小於你維護的同主題同分割槽的最後提交的偏移量,如果小於則代表你已經提交了更大的偏移量請求,此時不需要重試,否則就可以進行手動重試。
同步和非同步組合提交:
當發生關閉消費者或者再均衡時,一定要確保能夠提交成功,為了保證效能和可靠性,又有了同步和非同步組合提交的方式。也就是在消費者關閉前組合使用commitasync()方法和commitsync()方法。**樣例如下:
1234567
891011
1213
1415
1617
1819
try// 非同步提交
consumer.commitasync();
}} catch (exception e) finally finally
}
提交特定的偏移量
上面的提交方式都是提交當前最大的偏移量,但如果需要提交的是特定的乙個偏移量呢?
只需要在過載的提交方法中傳入偏移量引數即可。**樣例如下:
1234
// 同步提交特定偏移量commitsync(mapoffsets)
// 非同步提交特定偏移量
commitasync(mapoffsets, offsetcommitcallback callback)
上面的消費過程都是以無限迴圈的方式來演示的,那麼如何來優雅地停止消費者的輪詢呢。
kafka 提供了 consumer.wakeup() 方法用於退出輪詢。
如果確定要退出迴圈,需要通過另乙個執行緒呼叫consumer.wakeup()方法;如果迴圈執行在主線程裡,可以在shutdownhook裡呼叫該方法。
它通過丟擲 wakeupexception 異常來跳出迴圈。需要注意的是,在退出執行緒時最好顯示的呼叫 consumer.close() , 此時消費者會提交任何還沒有提交的東西,並向群組協調器傳送訊息,告知自己要離開群組,接下來就會觸發再均衡 ,而不需要等待會話超時。
下面的示例**為監聽控制台輸出,當輸入 exit 時結束輪詢,關閉消費者並退出程式:
1234567
891011
1213
1415
1617
1819
2021
2223
2425
2627
2829
30
// 呼叫wakeup優雅的退出輪詢final thread mainthread = thread.currentthread();
new thread(() -> catch (interruptedexception e) }}
}).start();
try
}} catch (wakeupexception e) finally
【參考資料】
《kafka 權威指南》
kafka深入理解
對於傳統的message queue而言,一般會刪除已經被消費的訊息,而kafka集群會保留所有的訊息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有資料 實際上也沒必要 因此kafka提供兩種策略去刪除舊資料。一是基於時間,二是基於partition檔案大小。可以通過配置 kafka h...
深入理解Linux核心3
unix核心提供了應用程式可以執行的環境,因此,核心必須實現一組服務及相應的介面。應用程式使用這些介面而不會跟硬體資源直接互動。啟用核心例程的幾種方式 核心恢復乙個程序執行時,用程序描述符中的合適字段裝載cpu暫存器 等待狀態可能會有很多,有程序描述符佇列實現 自旋鎖 檢查訊號量耗時多,對於時間較短...
深入理解C語言 深入理解指標
關於指標,其是c語言的重點,c語言學的好壞,其實就是指標學的好壞。其實指標並不複雜,學習指標,要正確的理解指標。指標也是一種變數,占有記憶體空間,用來儲存記憶體位址 指標就是告訴編譯器,開闢4個位元組的儲存空間 32位系統 無論是幾級指標都是一樣的 p操作記憶體 在指標宣告時,號表示所宣告的變數為指...