publicstaticvoidmain( string args) }} finally}
前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在《**kafka中的基本概念》 中介紹過了,消費組使得消費者的消費能力可橫向擴充套件,這次再介紹乙個新的概念 「 再均衡」 ,其意思是將分割槽的所屬權進行重新分配,發生於消費者中有新的消費者加入或者有消費者宕機的時候。我們先了解再均衡的概念,至於如何再均衡不在此深究。
public classconsumerrecords implementsiterable>
publicclassconsumerrecord< k, v>consumer.commitsync;;}
while( true) consumer.commitasync;;}
非同步提交時,程式將不會阻塞,但非同步提交在提交失敗時也不會進行重試,所以提交是否成功是無法保證的。因此我們可以組合使用兩種提交方式。在輪循中使用非同步提交,而當關閉消費者時,再通過同步提交來保證提交成功。
tryconsumer.commitasync;}} finally finally}
上述介紹的兩種無參的提交方式都是提交的 poll返回的乙個批次的資料。若未來得及提交,也會造成重複消費,如果還想更進一步減少重複消費,可以在 for 迴圈中為 commitasync和 commitsync傳入分割槽和偏移量,進行更細粒度的提交,例如每1000條訊息我們提交一次:
map currentoffsets = newhashmap<>;int count = 0;while( true) count++;}}
poll方法
關於提交就介紹到這裡。在使用消費者的**中,我們可以看到 poll 方法是其中最為核心的方法,能夠拉取到我們需要消費的訊息。所以接下來,我們一起深入到消費者 api 的幕後,看看在 poll方法中,都發生了什麼,其實現如下:
publicconsumerrecords poll( finalduration timeout)
在我們使用設定超時時間的 poll 方法中,會呼叫過載方法,第二個引數 includemetadataintimeout用於標識是否把元資料的獲取算在超時時間內,這裡傳值為 true,也就是算入超時時間內。下面再看過載的 poll方法的實現:
privateconsumerrecords poll( finaltimer timer, finalboolean includemetadataintimeout)
do else}
// 6.拉取訊息finalmap>> records = pollforfetches(timer);if(!records.isempty)
// 8.呼叫消費者***處理returnthis.interceptors.onconsume(new consumerrecords<>(records));}} while(timer.notexpired);
returnconsumerrecords.empty;} finally}
我們對上面的**逐步分析,首先是第1步 acquireandensureopen方法,獲取鎖並確保消費者沒有關閉,其實現如下:
privatevoidacquireandensureopen( )}
其中 acquire方法用於獲取鎖,為什麼這裡會要上鎖。這是因為 kafkaconsumer是執行緒不安全的,所以需要上鎖,確保只有乙個執行緒使用 kafkaconsumer拉取訊息,其實現如下:
privatestaticfinallongno_current_thread = - 1l;privatefinalatomiclong currentthread = newatomiclong(no_current_thread);privatefinalatomicinteger refcount = newatomicinteger( 0);privatevoidacquire
用乙個原子變數 currentthread作為鎖,通過 cas 操作獲取鎖,如果 cas 失敗,即獲取鎖失敗,表示發生了競爭,有多個執行緒在使用 kafkaconsumer,則會丟擲 concurrentmodificationexception異常,如果 cas 成功,還會將 refcount加一,用於重入。
第4步,安全的喚醒消費者,並不是喚醒,而是檢查是否有喚醒的風險,如果程式在執行不可中斷的方法或是收到中斷請求,會丟擲異常,這裡我還不是很明白,先放一下。
第5步,更新偏移量,就是我們在前文說的在進行拉取操作前會先檢查是否可以進行偏移量提交。
第6步, pollforfetches方法拉取訊息,其實現如下:
privatemap>> pollforfetches(timer timer)
// 2.準備拉取請求fetcher.sendfetches;
if(!cachedsubionhashallfetchpositions && polltimeout > retrybackoffms)
timer polltimer = time.timer(polltimeout);// 3.傳送拉取請求client.poll(polltimer, -> );timer.update(polltimer.currenttimems);// 3.返回訊息returnfetcher.fetchedrecords;}
如果 fetcher 已經有訊息了則立即返回,這裡和下面將要講的第7步對應。如果沒有訊息則使用 fetcher準備拉取請求然後再通過 consumernetworkclient傳送請求,最後返回訊息。
為啥訊息會已經有了呢,我們回到 poll的第7步,如果拉取到了訊息或者有未處理的請求,由於使用者還需要處理未處理的訊息,這時候可以使用非同步的方式發起下一次的拉取訊息的請求,將資料提前拉取,減少網路io的等待時間,提高程式的效率。
第8步,呼叫消費者***處理,就像 kafkaproducer中有 producerinterceptor,在 kafkaconsumer中也有 consumerinterceptor,用於處理返回的訊息,處理完後,再返回給使用者。
第9、10步,釋放鎖和記錄 poll 結束,對應了第1、2步。
對 kafkaconsumer的 poll方法就分析到這裡。最後用乙個思維導圖回顧下文中較為重要的知識點:
參考:《kafka權威指南》
《深入理解kafka核心設計和實踐原理》
你絕對能看懂的kafka源**分析-kafkaconsumer類**分析:
kafka消費者原始碼解析之一kafkaconsumer:
kafka 主動消費 Kafka基礎原理
1 訊息中介軟體作用 解耦 非同步 削峰 2 訊息中介軟體通訊模式 如上圖所示,點對點模式通常是基於拉取或者輪詢的訊息傳送模型,這個模型的特點是傳送到佇列的訊息被乙個且只有乙個消費者進行處理。生產者將訊息放入訊息佇列後,由消費者主動的去拉取訊息進行消費。點對點模型的的優點是消費者拉取訊息的頻率可以由...
kafka 檢視待消費資料 kafka檢視消費資料
kafka檢視消費資料 一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必...
kafka 檢視待消費資料 kafka檢視消費資料
一 如何檢視 在老版本中,使用kafka run class.sh 指令碼進行檢視。但是對於最新版本,kafka run class.sh 已經不能使用,必須使用另外乙個指令碼才行,它就是kafka consumer groups.sh 普通版檢視所有組 要想查詢消費資料,必須要指定組。那麼線上執行...