我們經常會遇到kafka資料丟失的問題,所以將遇到過的或有可能造成資料丟失的問題進行個小總結。
其實在kafka處理資料的流程有很多,把這些流程梳理一遍,有助於分析資料丟失的情況,從這個圖中可以看出資料流向,圖中涉及的所以過程都可能造成資料的丟失。
首先要確定是否有業務資料寫入
再明確資料是在kafka之前就已經丟失還是消費端丟失資料的?
2.1 如果是在寫入端丟失資料,那麼每次結果應該完全一樣(在寫入端沒有問題的前提下)。
2.2 如果是在消費端丟失資料,那麼換個消費group重新消費,多次消費結果完全一模一樣的機率很低。
丟失問題:若生產時是同步模式,那麼訊息一旦生產,就會阻塞到直到收到server端的確認。但非同步模式下,訊息不會立刻server端,而是在客戶端的緩衝區中進行快取,快取到指定大小或指定時間後,再傳送給server。
所以,若在非同步模式下,業務已經生產了資料,但還沒來得及傳送給server端時,server端就crash了,並且在重傳週期內,server端一直未恢復,那麼此訊息就會丟失。
效能問題:選擇同步模式,雖然很穩妥,但是每次都需要leader再向客戶端傳送確認。這個會降低傳送速度。但選用非同步模式,雖然速度是加快了,但是無法保證準確到達,快取的越多,到達時間會變的久一些,並且快取越大,傳送也可能會變更慢。
優化:在producer端有幾個引數會控制傳送大小和重傳的引數。
即使kafka收到了訊息,仍然可能丟失,因為kafka收到訊息後,並不是立刻落盤,而是存在了快取中,若在此階段kafka異常或磁碟壞掉,那麼此訊息仍會丟失。
解決:修改kafka的配置引數,調整flush到檔案的時間和條數
log.flush.interval.messages=10000
log.flush.interval.ms=3000
單批資料的長度超過kafka的限制時,會丟失資料,報kafka.common.messagesizetoolargeexception異常。
message.max.bytes=20000000(broker能接收訊息的最大位元組數,這個值應該比消費端的fetch.message.max.bytes更小才對,否則broker就會因為消費端無法使用這個訊息而掛起)
fetch.message.max.bytes=20485760
這條配置要符合 client的生產大小在此階段,會批量從server端讀取資料,如果設定自動提交位移,那麼有可能存在還未被業務側讀取,但offset已更新的情況,那麼資料就會丟失:
enable.auto.commit:是否自動提交位移,如果為false,則需要在程式中手動提交位移。
關於效能優化
總結如下:
在考慮效能問題時,根據資料的特點和要求,需要考慮:
是單程序還是多程序生產
是同步還是非同步生產,是否需要全部副本都確認
調整batch_size的大小,適當增大batch 大小可以來減小網路io和磁碟io的請求
是否需要多個partiton,分割槽是kafka進行並行讀寫的單位,是提公升kafka速度的關鍵。
是否需要幾個副本,副本越多,代價就是需要更多資源,尤其是磁碟資源,需要在副本數和可靠性之間平衡
是否需要開啟壓縮
消費時,是否需要多程序消費
rebalance問題
在專案中時常遇到rebalance問題,單獨小結一下:
觸發rebalance的條件有三種:
組成員發生變更(新 consumer 加入組、已有 consumer 主動離開組或已有 consumer 崩潰了)
訂閱主題數發生變更
訂閱主題的分割槽數發生變更
我們常遇到的就是consumer沒有和server沒有保持活躍,導致server認為此consumer已退出,所以需要rebalance。而此時offset還未提交,所以會有重複消費的問題。
而沒有保持活躍的原因有多種:
1)處理的執行緒被kill
2)消費者消費太慢,導致超時
3)消費太快,導致消費者處於空poll的狀態,阻塞傳送心跳執行緒;
以上原因都會讓server認為需要rebalance。
優化方案乙個是:降低max_poll_records(預設500)或提高metadata_max_age_ms(預設5分鐘強制重新整理metadata)
另乙個解決方案是修改heartbeat_interval_ms(預設3秒)和metadata_max_age_ms(預設5分鐘)為差不多的大小,例如將metadata_max_age_ms修改為3s
kafka丟失和重複消費資料
kafka作為當下流行的高併發訊息中介軟體,大量用於資料採集,實時處理等場景,我們在享受他的高併發,高可靠時,還是不得不面對可能存在的問題,最常見的就是丟包,重發問題。1 丟包問題 訊息推送服務,每天早上,手機上各終端都會給使用者推送訊息,這時候流量劇增,可能會出現kafka傳送資料過快,導致伺服器...
Kafka 訊息丟失和訊息重複消費
producer 的acks引數值設定為 0 或者 1 不等待伺服器確認或者只讓leader確認解決方法 將acks的值設定為all或者 1,讓leader和followers全部進行確認 producer 沒有設定失敗重試解決方法 根據實際場景將retries引數值設定為正整數 consumerp...
Kafka 訊息丟失和訊息重複消費
producer 的acks引數值設定為 0 或者 1 不等待伺服器確認或者只讓leader確認解決方法 將acks的值設定為all或者 1,讓leader和followers全部進行確認 producer 沒有設定失敗重試解決方法 根據實際場景將retries引數值設定為正整數 consumerp...