在使用 eventtime 處理 stream 資料的時候會遇到資料亂序的問題,流處理從 event(事 件)產生,流經 source,再到 operator,這中間需要一定的時間。雖然大部分情況下,傳輸到 operator 的資料都是按照事件產生的時間順序來的,但是也不排除由於網路延遲等原因而導致亂序的產生,特別是使用 kafka 的時候,多個分割槽之間的資料無法保證有序。因此, 在進行 window 計算的時候,不能無限期地等下去,必須要有個機制來保證在特定的時間後, 必須觸發 window 進行計算,這個特別的機制就是 watermark(水位線)。watermark 是用於 處理亂序事件的。
在 flink 的視窗處理過程中,如果確定全部資料到達,就可以對 window 的所有資料做 視窗計算操作(如彙總、分組等),如果資料沒有全部到達,則繼續等待該視窗中的資料全 部到達才開始處理。這種情況下就需要用到水位線(watermarks)機制,它能夠衡量資料處 理進度(表達資料到達的完整性),保證事件資料(全部)到達 flink 系統,或者在亂序及 延遲到達時,也能夠像預期一樣計算出正確並且連續的結果。當任何 event 進入到 flink 系統時,會根據當前最大事件時間產生 watermarks 時間戳。
如何計算watermark的值?
watermark = 進入 flink 的最大的事件時間(mxteventtime)— 指定的延遲時間(t)
有watermark 的 window 是怎麼觸發視窗函式?
如果有視窗的停止時間等於或者小於maxeventtime – t(當時的 warkmark),那麼 這個視窗被觸發執行
watermark 的使用存在三種情況:
1. 本來有序的 stream 中的 watermark
如果資料元素的事件時間是有序的,watermark 時間戳會隨著資料元素的事件時間按順 序生成,此時水位線的變化和事件時間保持一直(因為既然是有序的時間,就不需要設定延 遲了,那麼 t 就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想狀態下的水位 線。當 watermark 時間大於 windows 結束時間就會觸發對 windows 的資料計算,以此類推,下乙個 window 也是一樣。
2.亂序事件中的 watermark
現實情況下資料元素往往並不是按照其產生順序接入到 flink 系統中進行處理,而頻繁出現亂序或遲到的情況,這種情況就需要使用 watermarks 來應對。比如下圖,設定延遲時 間t為2
3.並行資料流中的 watermark
在多並行度的情況下,watermark 會有乙個對齊機制,這個對齊機制會取所有 channel中最小的 watermark。
1.有序的watermark
object watermark1 ).assignascendingtimestamps(_.calltime) // 資料有序的公升序watermark
.filter(_.calltype.equals("success"))
.keyby(_.sid)
.timewindow(time.seconds(10), time.seconds(5))
.reduce(new myreducefunction(), new returnmaxtimewindowfunction)
env.execute("assignascendingtimestampsdemo")
}
2.無序的watermarkobject watermark2 )
// 資料是亂序的,延遲時間為3秒,週期性watermark
/*** 第一種實現
*/val ds = stream.assigntimestampsandwatermarks(new boundedoutofordernesstimestampextractor[log](time.seconds(3))
})/**
* 第二種實現
*/val ds2 = stream.assigntimestampsandwatermarks(new assignerwithperiodicwatermarks[log]
// 設定eventtime是哪個屬性
override def extracttimestamp(element: log, previouselementtimestamp: long): long =
})env.execute("assigntimestampsandwatermarksdemo")
}
with punctuated(間斷性的) watermarkval env = streamexecutionenvironment.getexecutionenvironment
// 使用eventtime
env.setstreamtimecharacteristic(timecharacteristic.eventtime)
//讀取檔案資料
val data = env.sockettextstream("flink101",8888)
.map(line=>)
// 生成watermark
data.assigntimestampsandwatermarks(
new mycustomerpunctuatedwatermarks(3000l)) //自定義延遲
}class mycustomerpunctuatedwatermarks(delay:long) extends assignerwithpunctuatedwatermarks[stationlog]else }
override def extracttimestamp(element: stationlog, previouselementtimestamp: long): long =
}
以上三種watermark的實現,根據資料的事件時間是否有延遲和業務需求選擇相應的生成watermark的方法。 flink watermark原理總結
通過視窗對input按照eventtime進行聚合,使得大體按照event time 發生的順序去處理資料,同時利用watermark來觸發視窗。watermark window機制 watermark是flink為了處理eventtime時間型別 其他時間型別不考慮亂序問題 的視窗計算提出的一種機...
Flink WaterMark機制白話分析
最近遇見乙個流處理的資料嚴重遲到亂序的場景,基於saprk streaming開發的統計使用者頁面停留時間。使用的思想是 遲到資料的時間補償機制。由於spark不支援亂序的支援,所以自行實現了乙個容器儲存一定量的歷史資料,最後對遲到的資料插到歷史容器中,對插入資料的位置進行區域性計算求補償時間最後新...
Flink WaterMark 的了解 更新
我來講下 我理解的watermark 一起 如果要去了解乙個東西 我認為需要從以下幾個點去分析 1.是什麼 概念一點 2.解決了什麼問題以及是如何解決的 3.如果去使用 如何下手 4.他會牽扯的一些問題 好了 讓我們去 這個東西 1.首先 watermark 是乙個全域性標籤,本身是乙個時間戳 2....