眾所周知,kafka在目前的大資料市場中非常的火爆,因為kafka同時承擔了以下3種角色。
訊息儲存
流式處理框架
那麼kafka的整體架構如下:
kafka訊息是如何到達kafka集群的呢,並不是簡單的send一下,xiuxiuxiu~~就到了,話不多說,先看生產者架構
上圖中的recordaccumulator是乙個訊息累加器(收集器),主要是方便sender執行緒可以批量傳送資料到kafka集群。
整個生產者由2個執行緒協調工作:主線程、sender,具體流程如下:
sender執行緒以分割槽-批次的形式從累加器中獲取訊息並建立傳送請求;
sender將請求新增到請求對列,按照優先順序將請求傳送到selector,selector負責與kafka互動;
然後kafka將響應通過selector返回給到sender執行緒,執行緒根據響應決定是否將請求從佇列中剔除;
假如響應傳送成功,那麼會剔除已經響應的請求,並將訊息累加器中已經傳送的資料清理掉。
kafka的生產者在傳送訊息的時候只有topic的資訊,最終所有的topic在訊息收集器中以這樣的資料結構組織:>,在sender執行緒資料結構最終封裝之前,會從kafka集**送metadatarequest,從而獲取kafka集群的元資料資訊,該元資料報括
首先producer需要知道該topic有哪些分割槽,然後獲取該分割槽的所有partition,然後算出record所在的partition,然後找到該partition的leader副本所在的nodeid,然後才會將請求封裝在inflightrequests中。等待選擇器去將請求傳送給kafka集群。
記住,最終sender執行緒會會將封裝好全部放在inflightrequests中,inflightrequests的資料結構如下:
它是乙個map,map的key是每個node的id,為string型別,value為該node對應的未響應的請求,selector在選擇請求的時候,首先會選取出leastloadednode,即當前負載最小的乙個kafka節點,然後將該節點下的請求快速傳送,防止太多的阻塞影響整個生產者的進度。
在上圖中,很明顯node1的load是最小的,因為該node1對應的deque的size是最小的,所以該node就是leastloadednode,sender執行緒會為該leastloadednode傳送乙個metadatarequest從而獲取當前kafka集群的元資料資訊,然後通過selector將請求傳送給kafka集群。
在生產中指定的引數中boostrap.server
引數,只需要指定乙個kafka-node及id,因為client可以通過該server獲取整個kafka集群的資訊.
metadata rnax.age.rns
引數預設為300000ms,即5分鐘,該引數為client獲取元資料更新的等待時間。
Kafka之生產者
1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...
kafka 生產者(二)
想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...
Kafka之生產者 筆記一
文章內容選自 kafka技術內幕 1.訊息系統通常是由三大塊組成 生產者 消費者 訊息 功能 生產者會將訊息寫入訊息 中,消費者會從訊息 中讀取訊息。對於訊息 而言,消費者和生產者都是客戶端。2.通訊步驟 1.生產者客戶端應用產生訊息。2.生產者包裝訊息到請求頭中,傳送到客戶端。3.服務端物件負責接...