實際生產中,由於各種原因,導致事件建立時間與處理時間不一致,收集的規定對實時推薦有較大的影響。所以一般情況時選取建立時間,然後事先建立flink的時間視窗。但是問題來了,如何保證這個視窗的時間內所有事件都到齊了?這個時候就可以設定水位線(watermark)。
概念:支援基於時間視窗操作,由於事件的時間**於源頭系統,很多時候由於網路延遲、分布式處理,以及源頭系統等各種原因導致源頭資料的事件時間可能亂序。這時可以設定乙個時間閾值,或者說水位線(watermark),其作用定義乙個最大亂序時間,比如某條日誌時間為2019-01-01 08:00:10,如果亂序最大允許時間為10s,那麼就認為2019-01-01 08:00:00之前產生的所有事件都到齊了,可以進行計算。
時間視窗:指定乙個固定時間間隔的視窗
一、滑動視窗
1、slidingeventtimewindows.of(time.second(4), time.seconds(3)):表示滑動視窗大小為4秒,滑動步長是3 秒,同時,每3秒才滑動一次;
2、每條資料存活的時間為滑動視窗的大小;
3、如果滑動視窗超過之前的視窗,那麼後面來的屬於前面視窗的資料會丟失;
4、來了一條資料,邊移動邊計算滑動視窗的資料(乙個視窗停留,計算一次,不移動,不計算 ),直至視窗到達指定位置。
計算某位置時間的公式:
//n:時間戳;size視窗大小;slide:滑動長度
//根據等差公式推導
an = a1 + (x-1)*s
a1 = size - slide -1x = [n - (size-slide)]/slide //
除數後再乘以slide
s =slide //
當來了一條時間戳為n的事件,就認為指定位置時間之前的所有事件都到齊了
指定位置 = (size-slide-1) + [(n-watermark) - (size-slide)]/slide * slide
二、翻滾視窗
基於時間視窗,對連續資料進行迭代計算時,不會重疊。翻滾視窗是乙個特殊的滑動視窗,當視窗的長度等於滑動的長度時,滑動視窗就是翻滾視窗。
計算某位置時間的公式:
指定位置 = -1 + (n-watermark)/size * size //除數後再乘以size,size為視窗大小,n為時間戳
三、會話視窗
時間間隔達到一定時間長度時才進行統計計算。
##測試**(需要集群telnet乙個producer):
packagecom.cjs
import
org.apache.flink.streaming.api.timecharacteristic
import
org.apache.flink.streaming.api.functions.timestamps.boundedoutofordernesstimestampextractor
import
org.apache.flink.streaming.api.scala.streamexecutionenvironment
import
org.apache.flink.streaming.api.windowing.time.time
import
org.apache.flink.api.scala._
import
org.apache.flink.streaming.api.windowing.assigners.
object watermarktest
})
//提取時間戳之後,該資料流是帶有時間的,用於事件視窗
.map(x=>(x.split(" ")(1),1l)).keyby(0)
//設定使用事件時間,因為watermark是基於事件時間
senv.setstreamtimecharacteristic(timecharacteristic.eventtime)
//定義翻滾視窗
//直接輸出,沒有用到事件時間視窗,flink預設是累計統計,來乙個,統計乙個
//定義滑動視窗
stream.window(slidingeventtimewindows.of(time.seconds(4),time.seconds(2))).sum(1).print()
senv.execute("watermark")
} }
Flink學習筆記之WaterMark
event time 業務系統中事件發生的事件。通常因為各種原因會有部分延遲到達系統,所以需要進行亂序處理。ingestion time 到達流處理系統的事件,因為是在入口的地方賦值,具有流中統一不變的特性。processing time 流處理器的本地事件,因為flink是併發執行,各個處理器的本...
Flink水印機制(watermark)
flink流處理時間方式 設定flink流處理的時間型別 env.setstreamtimecharacteristic timecharacteristic.eventtime 問題 1.使用時間視窗來統計10分鐘內的使用者流量 2.有乙個時間視窗 3.有乙個資料,因為網路延遲 4.時間視窗並沒有...
資料水印 watermark
外發資料建立水印 產品通過對外發資料進行新增資料標記 自動生成水印 資料來源追溯等功能,避免了內部人員外發資料洩露無法對事件追溯,提高了資料傳遞的安全性和可追溯能力。資料水印系統 資料安全管理工具 安華金和 加密資料解密演算法 介面如果涉及敏感資料 如wx.getuserinfo當中的 openid...