1 引言
2 eventtime傾斜
2.1 情形
2.2 排查思路
2.3 解決方式
3 消費不均勻
3.1 情形
3.2 解決方式
4 資料延時
4.1 情形
4.1 解決方式
在flink中,eventtime即事件時間,能夠反映事件在某個時間點發生的真實情況,即使在任務重跑情況也能夠被還原,計算某一段時間內的資料,那麼只需要將eventtime範圍的資料聚合計算即可,但是資料在上報、傳輸過程中難免會發生資料延時,進而造成資料亂序,就需要考慮何時去觸發這個計算,flink使用watermark來衡量當前資料進度,使用時間戳表示,在資料流中隨著資料一起傳輸,當到watermark達使用者設定的允許延時時間,就會觸發計算。但是在使用eventtime的語義中,會出現一些不可預知的問題,接下來會介紹幾個在使用過程中遇到的一些問題與解決辦法。
eventtime傾斜是指在有shuffle的操作中,乙個task會接受上游多個task的資料,同樣也會接受上游多個watermark,但是存在其中乙個task的watermark相對於其他task的watermark滯後很多的情況,根據watermark的對齊機制,會選擇多個通道最小watermark值,這樣就會導致下游基於eventtime操作一直無法觸發或者滯後觸發。這個問題在上篇個人思考中有提到過。
在處理上游kafka中業務資料時,將業務設定的唯一鍵作為傳送kafka資料的key,那麼相同鍵的資料被分配在相同的partition, 下游flink任務處理使用唯一鍵作為key進行keyby操作,然後使用processfunction處理,在processfunction中會註冊eventtime定時器,那麼就會根據watermark觸發ontimer操作。在任務上線執行良好一段時間後,收到反饋沒有結果輸出。
檢視任務日誌,沒有異常日誌;檢視任務消費情況,該topic的資料正常被消費;檢視背壓、gc指標,一切正常。最終還是要回歸到任務處理邏輯本身,資料的輸出由ontimer來觸發,而ontimer的呼叫則是由watermark來決定的,只有當watermark達到註冊的事件時間才會觸發ontimer操作,那麼出現問題的點應該就是watermark,於是檢視了該處理節點的watermark值,發現其值一直都處於乙個很低的水平,觀察發現就算有資料流入watermark值還是未被更新,此刻想到了watermark對齊機制的處理,於是檢視上游各個task的watermark情況,發現其中乙個task的watermark很長時間都未被更新,檢視資料流入情況也沒有發現有資料在持續的流入,於是檢視該task對應消費的partition監控,果然很長時間沒有資料產生,那麼原因也很明朗了:由於上游task一直沒有資料產生導致其watermark一直未更新,根據watermark對齊機制,在processfunction節點的watermark也會一直不更新導致無法處理計算。
在註冊eventtime定時器的同時註冊processingtime定時器,那麼processingtime定時器觸發是由系統時間來決定觸發的,隨著時間的推進一定會觸發輸出操作,對於eventtime觸發的輸出只需要做覆蓋即可。
@override
public void processelement(
itemviewcount input,
context context,
collectorcollector) throws exception
對於消費端為空的情況,例如設定的source並行度大於topic的分割槽數,那麼就會存在source task沒有資料可消費,在source端(flinkkafkaconsuerm)會傳送乙個idle的標誌,表示在下游watermark對齊機制中忽略該通道的值,就不會影響watermark的流轉。但是如果針對上面的情形,剛開始有資料但是後續無資料,就會造成watermark無法更新,對此flink在內部實現了idle-timeout的策略,在指定的timeout時間範圍內,沒有資料輸出,就會往下游傳送idle標誌,當有資料流入就會傳送active標誌,重新參與watermark的對齊機制,此功能在1.11版本前處於關閉的狀態,具體可看。
在乙個task消費多個partition的情況下,但是partition資料傾斜比較嚴重,對於目前kafkaconsumer還無法做到均勻的消費每乙個partition,就會導致從每個partition拿到的資料不均勻: 其中乙個partition拉取的資料多,另外的partition拉取的資料少,那麼在下游生成watermark的時候,消費多的partition的資料會提公升watermark的值,而後在去拉取消費滯後的partition資料,會判斷為延時資料從而被過濾掉。
在乙個eventtime-window計算的業務處理中,source-task與partition是一對多的情況,發現最終生成的資料圖表呈下降趨勢,檢視任務numlaterecordsdropped指標(表示延時丟棄的資料量),一直呈上公升趨勢,同時檢視到該topic的監控中,某些partition消費lag維持在幾百左右,某些卻在幾千左右,由此判斷由於消費不均勻導致watermark迅速被提公升,從而導致大量資料被判斷為延時資料所丟棄。
將source-task的並行度設定為與topic parition數一致,那麼對於每乙個task來說消費能力相當,watermark 能夠維持在乙個相對平穩的狀態,並且在watermark對齊過程中,會選擇值最小的通道watermark值,因此能夠解決消費不均勻的問題。
只要是在event-time語義的資料流中,就不可避免乙個問題:資料延時,通常情況下會設定乙個允許資料延時的大小,也許你會想將延時設定很大,那麼同樣帶來的問題就是增加了處理的延時性,對於處理要求實時的來說是不可取的。對於資料延時來說,通常的做法就是要麼將延時資料丟棄、要麼單獨處理延時資料。
首先了解一下對於window是如何處理延時資料的,在預設情況下window是會將延時資料丟棄的,如果你想進一步像設定watermark一樣再給window設定乙個容忍的延時程度,可以設定allowedlateness引數值(預設為0),表示視窗允許的延時,在判斷是否延時資料時會將allowedlateness算進去:資料所在視窗的endtime+allowedlateness<=currwatermark, 視窗的觸發條件仍然是endtime<=currwatermark,但是視窗狀態資料清理條件是endtime+allowedlateness<=currwatermark,因此當watermark到達觸發視窗條件但是未達到清理條件時,也就是在allowedlateness延時範圍內,每來一條資料就會觸發一次視窗的計算,同時也增加了視窗狀態的保留時間,對記憶體會造成一定的負擔。如果在設定allowedlateness後,仍然存在資料延時的情況,可設定sideoutputtag,單獨獲取到延時的資料而後做進一步的處理。
延時丟棄的方式是最為簡單的一種方式,同時也會對資料正確性造成一定的誤差,但是如果想處理延時資料,就需要考慮如何與已經輸出的資料做合併計算(例如:聚合操作),由於合併過程可能會出現任務失敗恢復情況,會導致重複合併,對於不允許重複合併的情況下,在這個過程中又需要考慮資料一致性的問題,可以使用flink提供的兩階段提交幫助完成。
Flink學習筆記4 Flink框架api介紹
1.aggregations aggregations通過keyedstream進行一些聚合操作,例如sum min max 等。示例如下 keyedstream.sum 0 或者keyedstream.sum key 2.connect connect和union類似,但是只能連線兩個流,兩個流的...
Flink中處理亂序資料的三種方式
加水印 flink中的時間語意watermark,以事件時間減去所允許的最大亂序時間作為水印,原理相當於多給了資料一定的時間,然後關閉視窗,觸發計算。允許遲到allowedlateness 原理是在水印的基礎上在多給資料一定的可以遲到的時間,當水印到達視窗大小時觸發計算,但是不關閉視窗,到達所允許的...
RocketMQ訊息亂序場景及解決方法
訊息亂序也是rocketmq中的乙個常見問題,那麼到底為什麼會出現訊息亂序呢?首先我們知道在rocketmq的topic中,會有多個messagequeue作為資料分片,每個messagequeue都會儲存部分的訊息,那麼在生產者產生訊息傳送到topic上的時候,按道理這些訊息都會均勻的分布在多個m...