window分為兩大類:
countwindow:按照指定的資料條數生成乙個window,與時間無關。
timewindow:按照時間生成window:對於timewindow,可以根據視窗實現原理的不同分成三類:滾動視窗(tumbling window)、滑動視窗(sliding window)和會話視窗(session window)。
滾動視窗(tumbling windows)
將資料依據固定的視窗長度對資料進行切片。
特點:時間對齊,視窗長度固定,沒有重疊。滾動視窗分配器將每個元素分配到乙個指定視窗大小的視窗中,滾動視窗有乙個固定的大小,並且不會出現重疊。
滑動視窗(sliding windows)
滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。
特點:時間對齊,視窗長度固定,有重疊。滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另乙個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於視窗大小的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。
會話視窗(session windows)
由一系列事件組合乙個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。
特點:時間無對齊。
//遙感資料樣例類 資料**id , 產生的時間戳 ,溫度
case
class
sensorreading
(id:string,timestamp :long,temperature : double)
flink預設的時間視窗根據processing time進行視窗的劃分,將flink獲取到的資料根據進入flink的時間劃分到不同的視窗中。
// 每個感測器每個滾動視窗(15s)的最小溫度值
val mintempperwindow: datastream[
(string, double)
]= sensordata
.map
(r =
>
(r.id, r.temperature)
)// 按照感測器id分流
.keyby
(_._1)
.timewindow
(time.
seconds(15
)).reduce
((r1, r2)
=>
(r1._1, r1._2.
min(r2._2)
))
這種設定就是根據到達系統的時間為依據就行開窗計算,系統時間到達視窗結束時間時就會觸發視窗的計算。
使用事件時間為依據
在flink的流式處理中,絕大部分的業務都會使用event time,一般只在event time無法使用時,才會被迫使用processing time或者ingestion time。 如果要使用event time,那麼需要引入event time的時間屬性,引入方式如下所示:
val env = streamexecutionenvironment.getexecutionenvironment
// 從呼叫時刻開始給env建立的每乙個stream追加時間特徵
env.
setstreamtimecharacteristic
(timecharacteristic.eventtime)
流處理從事件產生,到流經source,再到operator,中間是有乙個過程和時間的,雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、分布式等原因,導致亂序的產生,所謂亂序,就是指flink接收到的事件的先後順序不是嚴格按照事件的event time順序排列的。
watermark是一種衡量event time進展的機制,它是資料本身的乙個隱藏屬性,資料本身攜帶著對應的watermark。
watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。
資料流中的watermark用於表示timestamp小於watermark的資料,都已經到達了,因此,window的執行也是由watermark觸發的。
watermark可以理解成乙個延遲觸發機制,我們可以設定watermark的延時時長t,每次系統會校驗已經到達的資料中最大的maxeventtime,然後認定event time小於maxeventtime - t的所有資料都已經到達,如果有視窗的停止時間等於maxeventtime – t,那麼這個視窗被觸發執行。
assignerwithperiodicwatermarks
assignerwithpunctuatedwatermarks
以上兩個介面都繼承自timestampassigner。
assignerwithperiodicwatermark是週期性的產生水銀,預設時間是200毫秒,可以通過引數來設定
// 每隔5秒產生乙個水印
env.getconfig.
setautowatermarkinterval
(5000)
eg:週期性的時間戳抽取
class
periodicassigner
extends
assignerwithperiodicwatermarks
[sensorreading]
override def extracttimestamp
(r: sensorreading, previousts: long)=}
//這種情況設定延時以後的產生的水位線
如果我們事先得知資料流的時間戳是單調遞增的,也就是說沒有亂序。我們可以使用assignascendingtimestamps,方法會直接使用資料的時間戳生成水印。
val stream: datastream[sensorreading]=.
..val withtimestampsandwatermarks = stream
.assignascendingtimestamps
(e =
> e.timestamp)
如果能夠大致推算出資料的中時間的最大延遲時間可以使用:
val stream: datastream[sensorreading]=.
..val withtimestampsandwatermarks = stream.
assigntimestampsandwatermarks
(new
sensortimeassigner
)class
sensortimeassigner
extends
boundedoutofordernesstimestampextractor
[sensorreading]
(time.
seconds(5
))
assignerwithpunctuatedwatermarks 是間歇性的產生水位線 比如我們可以只對某乙個key的資料產生水位線:
直接上**,只給sensor_1的感測器的資料流插入水印
class
punctuatedassigner
extends
assignerwithpunctuatedwatermarks
[sensorreading]
else
}//或事件時間的方式
override def extracttimestamp
(r: sensorreading, previousts: long)
: long =
}
水位線的設定需要權衡:
1)對資料處理要求嚴格就需要得到水位線之前的所有資料,必然需要增大延遲時間,但帶來的壓力是記憶體中的資料會產生更多的擠壓,造成記憶體壓力
2)設定的延遲時間稍微小一點後可以減少觸發的等待時間,緩解記憶體壓力,但是可能會丟失延遲的資料,但可以通過遲到資料的處理來更新視窗運算的結果
介紹flink對遲到資料的處理(預設是直接捨棄):
Flink流處理的時間視窗
對於流處理系統來說,流入的訊息是無限的,所以對於聚合或是連線等操作,流處理系統需要對流入的訊息進行分段,然後基於每一段資料進行聚合或是連線等操作。訊息的分段即稱為視窗,流處理系統支援的視窗有很多態別,最常見的就是時間視窗,基於時間間隔對訊息進行分段處理。本節主要介紹flink流處理系統支援的各種時間...
Flink的滾動視窗 會話視窗 滑動視窗及其應用
flink作業中的視窗 是指一種對無限資料流設定有限資料集,從而實現了處理無線資料流的機制。視窗本身只是個劃分資料集的依據,它並不儲存資料。當我們需要在時間視窗維度上對資料進行聚合時,視窗是流處理應用中經常需要解決的問題。flink的視窗運算元為我們提供了方便易用的api,我們可以將資料流切分成乙個...
Flink流式計算裡的時間和watermark機制
流計算 是相對於 批計算 來的,mapreduce,spark底層的計算方式是目前主流的 批計算 實現方式,很多公司在使用這種方式做大資料處理。但是越來越多的公司目前開始關注 流計算 主要有以下一些原因 1 對處理時間的要求。隨著技術的進步,使用者對 延遲 的忍受能力越來越弱,能更及時發現問題 解決...