實時任務資料丟失

2021-10-09 12:12:43 字數 1305 閱讀 9536

flink實時任務:

從kafka集群讀取源資料

從redis定期全量拉取使用者白名單,然後進行廣播

源資料connect白名單資料,源資料根據白名單資料進行過濾處理

過濾處理完後的資料,http推送、寫redis、寫log等

上線驗證的時候,有些資料丟失,而且比較頻繁,分析可能原因:

kafka源資料丟失。這個有可能,但是小概率事件,不應該那麼頻繁出現。

redis拉取白名單後廣播操作。這個有可能,需要重點排查。

過濾以及資料處理。這個不太可能,可以新增日誌證明自己猜想,實際也是沒有問題的。

http推送、寫redis、寫log等錯誤。這些操作比較穩定,也沒有太大問題。

分析**,有兩個問題

if (in2._2.nonempty)

上述**有三個問題:一是clear後可能剛好有有資料到達,但還put還沒有生效,這時資料無法正確處理;二是clear操作其實可以刪除,put操作時會刪除舊和更新最新值;三是沒有比較新舊值,直接put操作,頻率寫操作,效能低下,事實上白名單也並不經常更新,所以put之前最好判斷一下。

正確處理方式:應該獲取廣播變數,與最新資料對比,如果不相等,才更新廣播,優化後如下 :

// 更新狀態

if (value._2.nonempty)

}

變數廣播問題解決之後,發現還是有資料丟失。仔細跟蹤,生產端是雙活生產,就是資料可能隨機寫兩個kafka集群,但消費端將兩個fafka集群的brokers合成當作乙個kafka集群的brokers。實際兩個集群的brokers之間是有分號隔開的。

示例**如下

datastream> stream = env.addsource(getconsumer(params, brokers));

將兩個或者多個集群的brokers按分號隔開,建立兩個或者多個datastream,然後再union,修改後如下

datastream> streams = null;

listbrokerlist = arrays.aslist(params.get(game + ".kafka.source.brokers").split(";"));

for (int index = 0; index < brokerlist.size(); ++index) else

}

重新驗證問題,資料丟失的問題解決。 

實時任務頻寬控制

proc sys kernel sched rt runtimes us,預設 950000 proc sys kernel sched rt period us,預設 1000000 在使用該功能時,當實時任務的頻寬用盡時 sched rt runtime us 核心會將對應的實時執行佇列rt r...

實時任務 offset管理

背景 目前我們執行的實時任務基本上都是使用sparkstreaming,當然後面考慮使用最近比較火的flink,看了部分資料介紹後,我感覺sparkstreaming相對於flink,唯一的不足是,sparkstreaming在task排程上損耗了不少效能。flink還沒有深入研究內部實現,flin...

linux實時任務排程演算法分析

鑑於最近有關cpu占有率的一些問題涉及到linux核心的排程演算法,有必要進行了解。因此,寫了這篇文章。linux常見的任務有兩種,實時任務與非實時任務。實時任務的排程演算法是大家都非常熟悉的優先順序搶占或優先順序搶占加時間片兩種,其主要思想是效率優先。非實時任務的排程演算法是cfs 完全公平演算法...