kafka0.11.0.0版本正式支援精確一次處理語義exactly once semantic–eos
kafka冪等性參考
1)冪等producer 保證單個分割槽的只會傳送一次,不會出現重複訊息
2)事務(transation):保證原子性的寫入多個分割槽,即寫入到多個分割槽的訊息要麼全部成功,要麼全部回滾
3)流式eos:流處理本質上可看成是「」讀取-處理-寫入的管道「」。此eos保證整個過程的操作是原子性。注意,只使用kafka stream
重複問題:rebalance問題,
1.通常會遇到消費的資料處理很耗時,導致超過了kafka的session timeout時間(0.10.x版本預設是30秒),那麼就會rebalance重平衡,此時有一定機率offset沒提交,會導致重平衡後重複消費。
2.或者關閉kafka時,如果在close之前,呼叫consumer.unsubscribe()則可能有部分offset沒提交,下次重啟會重複消費。
3.消費程式和業務邏輯在乙個執行緒,導致offset提交超時。
try catch (exception e)
try catch (exception e)
解決方法:
配置解決:offset自動提交為false!
業務邏輯解決:自己提交偏移量,讓consumer邏輯冪等
啟用冪等producer:在producer程式中設定屬性enabled.idempotence=true,若要實現多分割槽上的原子性,需要引入事務,啟用事務支援:在producer程式中設定屬性transcational.id為乙個指定字串(你可以認為這是你的額事務名稱,故最好七個有意義的名字),同時設定enable.idempotence=true
冪等實現業務實現參考
生產者資料不丟失
同步模式:配置=1(只有leader收到,-1所有副本成功,0不等待)。leader partition掛了,資料就會丟失。
解決:設定為-1保證produce寫入所有副本算成功
producer.type=sync
request.required.acks=-1
非同步模式,當緩衝區滿了,如果配置為0(沒有收到確認,一滿就丟棄),資料立刻丟棄
解決:不限制阻塞超時時間。就是一滿生產者就阻塞
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
丟包問題:傳送資料過快,導致伺服器網絡卡爆滿,或者磁碟處於繁忙狀態,可能會出現丟包現象。
丟包解決方法:
1.啟用重試機制,重試間隔時間設定長一些
2.設定生產者(ack=all 代表至少成功傳送一次) ,即需要相應的所有處於isr的分割槽都確認收到該訊息後,才算傳送成功。
3.對kafka進行限速(限速可能會引起rebalance問題)
rebalance問題參考可解決方案
消費者資料不丟失
receiver(開啟wal,失敗可恢復)和director(checkpoint保證)
kafka丟失和重複消費資料
kafka作為當下流行的高併發訊息中介軟體,大量用於資料採集,實時處理等場景,我們在享受他的高併發,高可靠時,還是不得不面對可能存在的問題,最常見的就是丟包,重發問題。1 丟包問題 訊息推送服務,每天早上,手機上各終端都會給使用者推送訊息,這時候流量劇增,可能會出現kafka傳送資料過快,導致伺服器...
Kafka重複消費和丟失資料研究
kafka重複消費原因 底層根本原因 已經消費了資料,但是offset沒提交。原因1 強行kill執行緒,導致消費後的資料,offset沒有提交。原因2 設定offset為自動提交,關閉kafka時,如果在close之前,呼叫 consumer.unsubscribe 則有可能部分offset沒提交...
kafka如何保證訊息不丟失不被重複消費
在解決這個問題之前,我們首先梳理一下kafka訊息的傳送和消費機制。kafka的訊息傳送機制分為同步和非同步機制。可以通過producer.type屬性進行配置。使用同步模式的時候,有三種狀態來保證訊息的安全生產。可以通過配置request.required.acks屬性。三個屬性分別如下 當ack...