kafka如何保證資料不重複消費,不丟失資料
不重複消費:
1.冪等操作,重複消費不會產生問題
2. dstream.foreachrdd {(rdd, time) =
rdd.foreachpartition { partitioniterator =>
val partitionid = taskcontext.get.partitionid()
val uniqueid = generateuniqueid(time.milliseconds,partitionid)將uniqueid存入資料庫中
//use this uniqueid to transationally commit the data in partitioniterator
對每個partitionid,產生乙個uniqueid,.只有這個partition的資料被完全消費,才算成功,否則失敗回滾。下次若重複執行,就skip
不丟失資料:丟失情況:
1.生產者資料不丟失
同步模式:配置=1(只有leader收到,-1所有副本成功,0不等待)。leader partition掛了,資料就會丟失。
解決:設定為-1保證produce寫入所有副本算成功
producer.type=sync
request.required.acks=-1
非同步模式,當緩衝區滿了,如果配置為0(沒有收到確認,一滿就丟棄),資料立刻丟棄
解決:不限制阻塞超時時間。就是一滿生產者就阻塞
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
2.消費者資料不丟失 :流計算,基本資料來源不適用。高階資料來源以kafka為例,由2種方式:receiver(開啟wal,失敗可恢復)和director(checkpoint保證)
3. 若是storm在消費,開啟storm的ackfail機制;若不是storm,資料處理完更新offset,低階api手動控制offset
4. kafka傳送資料過快,導致伺服器網絡卡流量暴增。或磁碟過忙,出現丟包。
1》 首先,對kafka進行限速,
2》 其次啟用重試機制,使重試間隔變長。
3》 kafka設定ack=all,即需要處於isr(副本列表)的分割槽都確認,才算傳送成功。 rops.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all")表示至少成功傳送一次;
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);props.put("retry.backoff.ms", 20000)
5.消費者速度很慢,導致乙個session週期(0.1版本是預設30s)內未完成消費。導致心跳機制檢測報告出問題。
導致消費了的資料未及時提交offset.配置由可能是自動提交
問題場景:1.offset為自動提交,正在消費資料,kill消費者執行緒,下次重複消費
2.設定自動提交,關閉kafka,close之前,呼叫consumer.unsubscribed()則由可能部分offset沒有提交。
3.消費程式和業務邏輯在乙個執行緒,導致offset提交超時,
kafka是如何保證訊息不被重複消費的
一 kafka自帶的消費機制 kafka有個offset的概念,當每個訊息被寫進去後,都有乙個offset,代表他的序號,然後consumer消費該資料之後,隔一段時間,會把自己消費過的訊息的offset提交一下,代表我已經消費過了。下次我要是重啟,就會繼續從上次消費到的offset來繼續消費。但是...
kafka如何保證訊息不丟失不被重複消費
在解決這個問題之前,我們首先梳理一下kafka訊息的傳送和消費機制。kafka的訊息傳送機制分為同步和非同步機制。可以通過producer.type屬性進行配置。使用同步模式的時候,有三種狀態來保證訊息的安全生產。可以通過配置request.required.acks屬性。三個屬性分別如下 當ack...
如何保證訊息不被重複消費
如何保證訊息不被重複消費啊 如何保證訊息消費時的冪等性 首先就是比如rabbitmq rocketmq kafka,都有可能會出現消費重複消費的問題,正常。因為這問題通常不是mq自己保證的,是給你保證的。然後我們挑乙個kafka來舉個例子,說說怎麼重複消費吧。kafka實際上有個offset的概念,...