kafka producer寫詳細過程

2021-09-29 11:19:33 字數 2501 閱讀 9565

2  客戶端寫入引數。

batch.size

通過這個引數來設定批量提交的資料大小,預設是16k,當積壓的訊息達到這個值的時候就會統一傳送(發往同一分割槽的訊息)

linger.ms

這個設定是為傳送設定一定是延遲來收集更多的訊息,預設大小是0ms(就是有訊息就立即傳送)

滿足上述任意一條件即傳送訊息。

注: 如果批量16k或者時間大於0,  會有訊息延遲。  需視情況設定。預設是立即傳送

3 生產者基本邏輯:

producer producer = new kafkaproducer(props) 和 producer.send(msg, callback)

在建立 kafkaproducer 例項時,生產者應用會在後台建立並啟動乙個名為 sender 的執行緒,該 sender 執行緒開始執行時首先會建立與 broker 的連線。

1)初始化tcp連線

bootstrap.servers 引數。它是 producer 的核心引數之一,指定了這個 producer 啟動時要連線的 broker 位址。請注意,這裡的「啟動時」,代表的是 producer 啟動時會發起與這些 broker 的連線。因此,如果你為這個引數指定了 1000 個 broker 連線資訊,那麼很遺憾,你的 producer 啟動時會首先建立與這 1000 個 broker 的 tcp 連線。

2)元資料更新tcp連線

場景一:當 producer 嘗試給乙個不存在的主題傳送訊息時,broker 會告訴 producer 說這個主題不存在。此時 producer 會傳送 metadata 請求給 kafka 集群,去嘗試獲取最新的元資料資訊。然後重新整理連線

場景二:producer 通過 metadata.max.age.ms 引數定期地去更新元資料資訊。該引數的預設值是 300000,即 5 分鐘,也就是說不管集群那邊是否有變化,producer 每 5 分鐘都會強制重新整理一次元資料以保證它是最及時的資料

3)關閉tcp連線

a)主動關閉實際上是廣義的主動關閉,

呼叫 kill -9 主動「殺掉」producer 應用。當然最推薦的方式還是呼叫 producer.close() 方法來關閉

b)被動關閉(客戶端發起連線,broker 端關閉)

與 producer 端引數 connections.max.idle.ms 的值有關。預設情況下該引數值是 9 分鐘,即如果在 9 分鐘內沒有任何請求「流過」某個 tcp 連線,那麼 kafka 會主動幫你把該 tcp 連線關閉。

使用者可以在 producer 端設定 connections.max.idle.ms=-1 禁掉這種機制。一旦被設定成 -1,tcp 連線將成為永久長連線。當然這只是軟體層面的「長連線」機制,由於 kafka 建立的這些 socket 連線都開啟了 keepalive,因此 keepalive 探活機制還是會遵守的(tcp socket定期預設2小時會傳送訊號檢測對方,保持繼續連線,或者異常重新連線,或者檢測不到關閉)。

4 寫入確認過程

acks=0:如果生產者能夠通過網路把訊息傳送出去,那麼就認為訊息已經成功寫入kafka

acks=1:首領在收到訊息並把它寫入到分割槽資料檔案(linux檔案系統快取)時返回確認或錯誤響應

acks=all/-1(在0.9.0以前的版本,是用-1表示all):首領在返回確認或者錯誤響應之前,會等待所有同步副本都收到訊息

5  副本同步過程。

ifollower副本同步

follewer副本只做一件事情:向leader副本請求資料。

術語:起始位移(base sffset):表示副本當前第一條訊息的offset。

高水印值(high watermark,hw):副本高水印值。是指isr中min.insync.replicas指定數量的節點都已經複製完的訊息的offset。也是消費者所能獲取到的訊息的最大offset。超過hw值的所有訊息被視為「未提交成功的」,因而consumer是看不到的。

日誌末端位移(log end offset,leo):副本 日誌中下一條待寫入訊息的offset。

當生產者發布訊息到topic的某個分割槽時,訊息首先被傳遞到leader副本,然後follower副本向leader副本請求訊息,leader副本所在的broker推送訊息給follower副本,一旦有足夠的副本收到訊息,leader就會提交這個訊息(isr中副本同步完成,ack=-1情況下),消費者就能消費到這個訊息了。

如果副本發生故障,leader會把他從isr中剔除,那如果follower重啟後呢?follower重啟後會去leader上恢復最新的hw並將日誌截斷到hw,並繼續從leader中獲取hw以後的訊息,一旦完全趕上leader,副本將被重新加入到isr佇列中,系統將重新回到fully replicated模式。

那如果leader發生故障呢,其他follower會爭相競爭做leader,最終只有乙個follower競爭成功公升級成為leader,故障leader重啟後成為follower去新leader同步訊息。 

kafka producer 分割槽器

策略一 如果傳送訊息的時候,沒有指定key,輪詢達到負載均衡 策略二 這個地方就是指定了key,hash取模,相同的key打到同乙個分割槽上 int partition partition record,serializedkey,serializedvalue,cluster return par...

kafka producer 異常處理

sender.completebatch if error errors.none canretry batch,error on topic partition retrying attempts left error correlationid,batch.topicpartition,this...

三 Kafka Producer傳送訊息及分割槽策略

1 producer 實現 ps 不建議使用自定義序列化和反序列化,他們會把生產者和消費者耦合在一起,且容易出錯 同步傳送訊息 非同步傳送訊息 public class kafkaproducerdemo public static void main string args asyncsendme...