flink watermark原理總結

2021-10-02 09:23:04 字數 3944 閱讀 3060

通過視窗對input按照eventtime進行聚合,使得大體按照event time 發生的順序去處理資料,同時利用watermark來觸發視窗。(watermark + window機制)

watermark是flink為了處理eventtime時間型別(其他時間型別不考慮亂序問題)的視窗計算提出的一種機制,本質上也是一種時間戳。watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。

當operator通過基於event time的時間視窗來處理資料時,它必須在確定所有屬於該時間視窗的訊息全部流入此操作符後,才能開始處理資料。但是由於訊息可能是亂序的,所以operator無法直接確認何時所有屬於該時間視窗的訊息全部流入此操作符。watermark包含乙個時間戳,flink使用watermark標記所有小於該時間戳的訊息都已流入,flink的資料來源在確認所有小於某個時間戳的訊息都已輸出到flink流處理系統後,會生成乙個包含該時間戳的watermark,插入到訊息流中輸出到flink流處理系統中,flink operator運算元按照時間視窗快取所有流入的訊息,當操作符處理到watermark時,它對所有小於該watermark時間戳的時間視窗的資料進行處理併發送到下乙個操作符節點,然後也將watermark傳送到下乙個操作符節點。

流處理的過程中,從事件產生,到流經source,operator,中間是有乙個過程和時間的,雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是由於網路,背壓等原因,導致亂序的產生。(out-of-order或者說late element)

對於late element,我們不能無限期的等下去,必須有乙個機制保證在特定的時間後,必須觸發window去計算,這個特別的機制就是watermark。

通常,在接收到source的資料後,會立刻生成watermark;但是,也可以在source之後,應用簡單的map、filter再生成watermark。

生成watermark的方式有兩種:

1.periodic watermarks

periodic watermarks,週期性的產生watermark,即每隔一定時間間隔或者達到一定的記錄條數,產生乙個watermark。

而在實際的生產中,periodic方式必須結合時間和記錄數兩個維度,否則,在極端情況下容易產生很大的延時。

2.punctuated watermarks
punctuated watermarks,資料流中每乙個遞增的event time 都會產生乙個watermark。

在實際的生產中,punctuated 方式在tps很高的場景下會產生大量的watermark,

在一定程度上對下游運算元造成壓力,所以只有在實時性要求非常高的場景才會選擇punctuated方式。

注意:為什麼watermark=currentmaxtimestamp - maxlatetime?

假設不考慮延遲,watermark=currentmaxtimestamp,隨著水位線的上公升,當水位線(即當前最大時間)超過endtime時,所有的資料已經全部進入該視窗了。

繼續考慮存在延遲的情況,為了使得延時了maxlatetime的資料全部進入視窗,預先讓水位線下降maxlatetime,在這種情況下,當水位線依然超過endtime時,表明在允許延遲的情況下,所有資料全部進入該視窗了。

如果順序,只需要最新的event time >=windowendtime,視窗就會觸發。

如果亂序,由於要等 maxlatetime,所以最新的event time - maxlatetime >=windowendtime時,視窗觸發。這裡取其次,只要currentmaxtimestamp- maxlatetime>=windowendtime,視窗就會觸發。

/**

*flink 1.7

hello,2019-09-17 11:34:05.890

hello,2019-09-17 11:34:07.890

hello,2019-09-17 11:34:13.890

hello,2019-09-17 11:34:08.890

hello,2019-09-17 11:34:16.890

hello,2019-09-17 11:34:19.890

hello,2019-09-17 11:34:21.890

*/public

class

watermarktest})

.keyby(0

).timewindow

(time.

seconds(10

))// .sum(1)

//自定義的乙個計算規則....(

newmywindowfunction()

).printtoerr()

;try

catch

(exception e)}}

/* *資料進來,先extract時間,同時更新max值,再生成watermark

*/class

mywatermark

implements

assignerwithperiodicwatermarks

/***

* @param element 流中的資料 形如:"hello,2019-09-17 10:24:50.958"

* @param previouselementtimestamp 上條資料的時間戳

* @return 新的時間戳

*/@override

public

long

extracttimestamp

(string element,

long previouselementtimestamp)

catch

(parseexception e)

//對比新資料的時間戳和目前最大的時間戳,取大的值作為新的時間戳

currentmaxtimestamp = math.

max(timestamp, currentmaxtimestamp)

; system.err.

println

(key +

", 本條資料的時間戳: "

+ timestamp +

","+format.

format

(timestamp)

+"|目前資料中的最大時間戳: "

+ currentmaxtimestamp +

","+ format.

format

(currentmaxtimestamp)

+"|水位線時間戳: "

+ wm +

","+ format.

format

(wm.

gettimestamp()

));return timestamp;}}

class

mywindowfunction

implements

windowfunction

, string, tuple, timewindow>

long start = window.

getstart()

;long end = window.

getend()

; out.

collect

("key:"

+ tuple.

getfield(0

)+" value: "

+ sum +

"| window_start :"

+ format.

format

(start)

+" window_end :"

+ format.

format

(end));

}}

Flink WaterMark機制白話分析

最近遇見乙個流處理的資料嚴重遲到亂序的場景,基於saprk streaming開發的統計使用者頁面停留時間。使用的思想是 遲到資料的時間補償機制。由於spark不支援亂序的支援,所以自行實現了乙個容器儲存一定量的歷史資料,最後對遲到的資料插到歷史容器中,對插入資料的位置進行區域性計算求補償時間最後新...

Flink WaterMark 的了解 更新

我來講下 我理解的watermark 一起 如果要去了解乙個東西 我認為需要從以下幾個點去分析 1.是什麼 概念一點 2.解決了什麼問題以及是如何解決的 3.如果去使用 如何下手 4.他會牽扯的一些問題 好了 讓我們去 這個東西 1.首先 watermark 是乙個全域性標籤,本身是乙個時間戳 2....

Flink WaterMark原理與實現

在使用 eventtime 處理 stream 資料的時候會遇到資料亂序的問題,流處理從 event 事 件 產生,流經 source,再到 operator,這中間需要一定的時間。雖然大部分情況下,傳輸到 operator 的資料都是按照事件產生的時間順序來的,但是也不排除由於網路延遲等原因而導致...