session.timout.ms決定了consumer存活性的時間間隔
heartbeat.interval.ms決定存活心跳傳送間隔。
max.poll.interval.ms 它限定了consumer端應⽤程式兩次調⽤poll⽅法的最⼤時間間隔。
消費者例項在kafkaconsumer.poll建立tcp連線,主要分為3類連線:
確定協調者和獲取集群元資料。
連線協調者,令其執⾏組成員管理操作。
執⾏實際的訊息獲取。
第一類連線僅在開始前建立,稍後(第三類建立成功)就會銷毀,consumer例項會長期保留2,3類連線。
consumer例項會長期建立broker數量(分割槽所在broker數量)+1個連線。
tcp連線的三個時機:
發起findcoordinator請求時
連線協調者時
消費資料時
何時關閉tcp連線:
⼿動調⽤kafkaconsumer.close()⽅法
執⾏kill命令
kafka⾃動關閉(是由消費者端引數connection.max.idle.ms控制的,該引數現在的預設值是9分鐘)
mirrormaker
uber的ureplicator⼯具
linkedin開發的brooklin mirror maker⼯具
confluent公司研發的replicator⼯具
mirrormaker2.0已經發布(2.4.0+)基於connector重寫了mirrormaker。目前我們公司使用自研的kafka connector實現跨集群資料映象。
broker端引數replica.lag.time.max.ms引數值,這個引數的含義是follower副本能夠落後leader副本的最⻓時 間間隔,當前預設值是10秒。
broker端引數unclean.leader.election.enable控制是否允許unclean領導者選舉,true提高了高可用,降低了資料一致性。推薦設定為false
0.11.0以後的版本預設都是false,從2.1.0版本開始複寫動態配置的話,預設為啟用
num.network.threads 決定網路執行緒池的數量,預設值為3
num.io.threads 控制io執行緒池的數量,預設值為8
以下為常用broker動態配置,不用重啟broker即可生效
log.retention.ms。
num.io.threads和num.network.threads
與ssl相關的引數(ssl.keystore.type、ssl.keystore.location、ssl.keystore.password和ssl.key.password)。。
num.replica.fetchers。
當需要調整吞吐量的情況時可以考慮如下調整引數:
作用範圍
引數broker
1.適當增加數num.replica.fetchers數量,但不要超過cpu數量 2.調整gc引數以避免經常性gc
producer
1.增加訊息批次的⼤⼩以及批次快取時間,即batch.size和linger.ms 2.配置壓縮演算法,lz4/zstd 3.acks=0或1,4.retries=0 5.如果多個執行緒共享producer,適當增大buffer.memory
consumer
增加fetch.min.bytes引數值。預設是1位元組
當需要調整延時的情況時可以考慮如下調整引數:
作用範圍
引數broker
增加num.replica.fetchers數量
producer
1.設定linger.ms=0 2.不要啟⽤壓縮 3.置acks=1
幫助kafka完成副本同步
處理⽣產者請求的邏輯如下:
寫⼊訊息到本地磁碟。
更新分割槽⾼⽔位值。i. 獲取leader副本所在broker端儲存的所有遠端副本leo值。ii. 獲取leader副本leo:leader_leo。iii. 更新currenthw = min((leader_leo, leo-1,leo-2,……,leo-n)。
處理follower副本拉取訊息的邏輯如下:
讀取磁碟(或⻚快取)中的訊息資料。
使⽤follower副本傳送請求中的位移值更新遠端副本leo值。
更新分割槽⾼⽔位值(具體步驟與處理⽣產者請求的步驟相同)。
從leader拉取訊息的處理邏輯如下:
寫⼊訊息到本地磁碟。
更新leo值。
更新⾼⽔位值。i. 獲取leader傳送的⾼⽔位值:currenthw(leader)。ii. 獲取步驟2中更新過的leo值:currentleo。iii. 更新⾼⽔位為min(currenthw, currentleo)。
文字比較費解,還是看圖。原文和我寫的有點出入,不過我感覺這樣才是正確的!
所謂leader epoch,我們⼤致可以認為是leader版本。它由兩部分資料組成。
epoch。⼀個單調增加的版本號。每當副本領導權發⽣變更時,都會增加該版本號。⼩版本號的leader被認為是過期 leader,不能再⾏使leader權⼒。
起始位移(start offset)。leader副本在該epoch值上寫⼊的⾸條訊息的位移。
kafka broker會在記憶體中為每個分割槽都快取leader epoch資料(會持久化,leader-epochcheckpoint⽂件),來避免重啟截斷日誌的情況發生。
歡迎使用,
歡迎感興趣的同學加入我們,做點感興趣的事。
加入開源,加入我們。
kafka學習總結之kafka核心
1 kafka核心元件 1 replication 副本 partition 分割槽 乙個topic可以有多個副本,副本的數量決定了有多少個broker存放寫入的資料 副本是以partition為單位的,存放副本即是備份若干個partition,但是只有乙個partition被選為leader用於讀...
Kafka系列 Kafka核心概念
kafka系列文章 kafka系列 入門及應用場景 部署 簡單測試 bin kafka topics.sh create zookeeper 192.168.137.141 2181,192.168.137.142 2181,192.168.137.143 2181 kafka replicatio...
Kafka核心總結
乙個kafka的message由乙個固定長度的header和乙個變長的訊息體body組成。header部分由乙個位元組的magic 檔案格式 和四個位元組的crc32 用於判斷body訊息體是否正常 構成。當magic的值為1時,會在magic和crc32之間多乙個位元組的資料 attributes...