在flink中的時間視窗中有個重要概念,就是watermark,也就是我們經常談論的水印,這裡我們不對水印的概念和使用方式進行介紹,這裡從原始碼的角度來看,如何不斷的生成水印。
在flink中,有兩種水印timestampsandpunctuatedwatermarksoperator
timestampsandperiodicwatermarksoperator
我們編寫原因水印的**如下:
//抽取timestamp和生成watermarkdatastream> watermarkstream =
inputmap.assigntimestampsandwatermarks(
new assignerwithperiodicwatermarks>() {
long currentmaxtimestamp = 0l;
final long maxoutoforderness = 10000l; // 最大允許的亂序時間是10s
@nullable
@override
public watermark getcurrentwatermark() {
return new watermark(currentmaxtimestamp - maxoutoforderness);
//定義如何提取timestamp@override
public long extracttimestamp(tuple2 element, long previouselementtimestamp) {
long timestamp = element.f1;
return timestamp;
timestampsandpunctuatedwatermarksoperator
是乙個流運算子,生成水印是根據輸入的元素,沒輸出乙個元素,就會輸出乙個水印,如果不想輸出水印,那麼就輸出乙個null,核心**
public void processelement(streamrecord element) throws exception {
final t value = element.getvalue();
// 通過使用者的**獲取到事件時間,注入到element裡面就直接往下個opeartor傳送final long newtimestamp = userfunction.extracttimestamp(value,
element.hastimestamp() ? element.gettimestamp() : long.min_value);
output.collect(element.replace(element.getvalue(), newtimestamp));
//通過使用者**獲取水印,這裡會判斷水印是否為null//不為null的就直接往下游emit 了final watermark nextwatermark = userfunction.checkandgetnextwatermark(value, newtimestamp);
if (nextwatermark != null && nextwatermark.gettimestamp() > currentwatermark) {
currentwatermark = nextwatermark.gettimestamp();
output.emitwatermark(nextwatermark);
上面的方法中,我們每乙個元素的處理,都會呼叫processelement 方法,引數就是處理的滅乙個元素,方便內部主要做下面幾件事:
1、從使用者的**中獲取事件時間,然後注入到element中,然後傳送到下乙個operator中
2、通過使用者的**獲取定義的水印,如果水印不為null,那麼就emit到下游
根據上面的分析,我們可知,如果存在水印,那麼每乙個元素後就會輸出乙個水印。
timestampsandperiodicwatermarksoperator
這也是乙個流操作,定義水印的生成方式,從類名字中的periodic,我們可以猜測這是乙個週期性生成水印的操作,我們從類中看核心**:
public void open() throws exception {
super.open();
currentwatermark = long.min_value;
// 獲取週期性生成水印的間隔watermarkinterval = getexecutionconfig().getautowatermarkinterval();
// 週期性水印,是通過處理時間來實現的,一開始會獲取當前的真實時間+我們設定的水印間隔 來作為乙個定時觸發器if (watermarkinterval > 0) {
// 獲取當前的處理時間long now = getprocessingtimeservice().getcurrentprocessingtime();
getprocessingtimeservice().registertimer(now + watermarkinterval, this);
open 方法時這個類的初始化方法,我們可以從上面的**中看到,在open方法中,先從我們的環境配置中獲取週期生成水印的時間間隔watermarkinterval ,如果時間間隔大於0,那麼就獲取當前的時間,然後註冊乙個process定時器,下次觸發的時間是now+watermarkinterval ,從這裡我們可以看到,這個類生成水印是需要借助processtime服務的。
// 到了一定的間隔時間 會觸發onprocessingtime 這個方法裡面的內容@override
public void onprocessingtime(long timestamp) throws exception {
// register next timerwatermark newwatermark = userfunction.getcurrentwatermark();
if (newwatermark != null && newwatermark.gettimestamp() > currentwatermark) {
currentwatermark = newwatermark.gettimestamp();
// emit watermark 傳送乙個水印output.emitwatermark(newwatermark);
// 繼續註冊乙個以當前時間+間隔,作為乙個定時器 ,這樣乙個週期性觸發水印往下游傳送的實現就完成了long now = getprocessingtimeservice().getcurrentprocessingtime();
getprocessingtimeservice().registertimer(now + watermarkinterval, this);
onprocessingtime 這個方法時到了定時器觸發的時候,會呼叫這個方法。這個方法主要作用如下:
1、從使用者的**中獲取watermark,如果存在watermark,並且時間大於currentwatermark,那麼就emit乙個水印到下游。
2、獲取當前時間now,然後用now+watermarkinterval 繼續註冊乙個process定時器。
public void processelement(streamrecord element) throws exception {
// 獲取事件時間,然後傳送出去final long newtimestamp = userfunction.extracttimestamp(element.getvalue(),
element.hastimestamp() ? element.gettimestamp() : long.min_value);
output.collect(element.replace(element.getvalue(), newtimestamp));
上面的processelement 方法,就是從使用者**中獲取時間,然後註冊到element中,輸出到下游。
上面我們就分析了flink中的兩種水印。
flink水印的產生方式
assignerwithpunctuatedwatermarks 每乙個event到來的時候,就會提取一次watermark assignerwithperiodicwatermarks 可以定義乙個最大允許亂序的時間,生成水印的間隔 每n毫秒 使用 executionconfig.setautow...
Flink水印機制(watermark)
flink流處理時間方式 設定flink流處理的時間型別 env.setstreamtimecharacteristic timecharacteristic.eventtime 問題 1.使用時間視窗來統計10分鐘內的使用者流量 2.有乙個時間視窗 3.有乙個資料,因為網路延遲 4.時間視窗並沒有...
Flink 事件 水印 計算的關係
看了好久的對水印的介紹,總結出以下的關係。水印 用於衡量事件時間進度的機制 為了解決亂序事件輸出正確的結果。事件 水印 計算的關係 事件生成水印的策略 1 遞增式的水印生成,適合遞增的資料,如果有不遞增的資料,那麼會被認為壞資料處理 2 週期計算,每次生成通過 週期大小,比如設定的週期是10s,那麼...