kafka是通過心跳機制來控制消費超時,心跳機制對於消費者客戶端來說是無感的,它是乙個非同步執行緒,當我們啟動乙個消費者例項時,心跳執行緒就開始工作了。心跳超時會導致訊息重複消費。
在org.apache.kafka.clients.consumer.internals.abstractcoordinator中會啟動乙個heartbeatthread執行緒來定時傳送心跳和檢測消費者的狀態。每個消費者都有個org.apache.kafka.clients.consumer.internals.consumercoordinator,而每個consumercoordinator都會啟動乙個heartbeatthread執行緒來維護心跳,心跳資訊存放在org.apache.kafka.clients.consumer.internals.heartbeat中,宣告的schema如下所示:
private final int sessiontimeoutms;
private final int heartbeatintervalms;
private final int maxpollintervalms;
private final long retrybackoffms;
private volatile long lastheartbeatsend;
private long lastheartbeatreceive;
private long lastsessionreset;
private long lastpoll;
private boolean heartbeatfailed;
心跳執行緒實現方法
public void run()
if (state != memberstate.stable)
client.pollnowakeup();
long now = time.milliseconds();
if (coordinatorunknown()) else if (heartbeat.sessiontimeoutexpired(now)) else if (heartbeat.polltimeoutexpired(now)) else if (!heartbeat.shouldheartbeat(now)) else
}@override
public void onfailure(runtimeexception e) else }}
});}}}
} catch (authenticationexception e) catch (groupauthorizationexception e) catch (interruptedexception | interruptexception e) catch (throwable e) finally
}
在心跳執行緒中這裡面包含兩個最重要的超時函式,分別是sessiontimeoutexpired() 和 polltimeoutexpired()。
public boolean sessiontimeoutexpired(long now)
public boolean polltimeoutexpired(long now)
如果sessiontimeout超時,則會被標記為當前協調器處理斷開, 即將將消費者移除,重新分配分割槽和消費者的對應關係。在kafka broker server中,consumer group定義了5中(如果算上unknown,應該是6種狀態)狀態,org.apache.kafka.common.consumergroupstate,如下圖所示:
如果觸發了poll超時,此時消費者客戶端會退出consumergroup,當再次poll的時候,會重新加入到consumergroup,觸發消費者再平衡策略rebalancegroup。而kafkaconsumer client是不會幫我們重複poll的,需要我們自己在實現的消費邏輯中不停的呼叫poll方法。
tcp心跳機制
對連線上來的連線,進行檢測,以防止客戶端異常關閉,或線路異常斷開,而伺服器不知道,得到乙個半連線這種情況。當然可以在協議裡加乙個心跳包,然後伺服器端定時檢測,過一段時間就去輪訓一次,看哪些連線超過多少時間沒有反應。超時就關閉。但這樣有點不爽,要自己寫程式碼來完成。還要鎖定連線列表,代價挺大的。記得以...
Eureka 心跳機制
server服務端 server port 8761 eureka client 例項是否在eureka伺服器上註冊自己的資訊以提供其他服務發現,預設為true register with eureka false 此客戶端是否獲取eureka伺服器登錄檔上的註冊資訊,預設為true fetch r...
tcp心跳機制
對連線上來的連線,進行檢測,以防止客戶端異常關閉,或線路異常斷開,而伺服器不知道,得到乙個半連線這種情況。當然可以在協議裡加乙個心跳包,然後伺服器端定時檢測,過一段時間就去輪訓一次,看哪些連線超過多少時間沒有反應。超時就關閉。但這樣有點不爽,要自己寫程式碼來完成。還要鎖定連線列表,代價挺大的。記得以...