Flink的時間和視窗的使用 水位線的設定

2021-10-01 17:12:49 字數 4071 閱讀 9294

window分為兩大類:

countwindow:按照指定的資料條數生成乙個window,與時間無關。

timewindow:按照時間生成window:對於timewindow,可以根據視窗實現原理的不同分成三類:滾動視窗(tumbling window)、滑動視窗(sliding window)和會話視窗(session window)。

滾動視窗(tumbling windows)

將資料依據固定的視窗長度對資料進行切片。

特點:時間對齊,視窗長度固定,沒有重疊。滾動視窗分配器將每個元素分配到乙個指定視窗大小的視窗中,滾動視窗有乙個固定的大小,並且不會出現重疊。

滑動視窗(sliding windows)

滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。

特點:時間對齊,視窗長度固定,有重疊。滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另乙個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於視窗大小的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。

會話視窗(session windows)

由一系列事件組合乙個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。

特點:時間無對齊。

//遙感資料樣例類 資料**id , 產生的時間戳 ,溫度

case

class

sensorreading

(id:string,timestamp :long,temperature : double)

flink預設的時間視窗根據processing time進行視窗的劃分,將flink獲取到的資料根據進入flink的時間劃分到不同的視窗中。

// 每個感測器每個滾動視窗(15s)的最小溫度值

val mintempperwindow: datastream[

(string, double)

]= sensordata

.map

(r =

>

(r.id, r.temperature)

)// 按照感測器id分流

.keyby

(_._1)

.timewindow

(time.

seconds(15

)).reduce

((r1, r2)

=>

(r1._1, r1._2.

min(r2._2)

))

這種設定就是根據到達系統的時間為依據就行開窗計算,系統時間到達視窗結束時間時就會觸發視窗的計算。

使用事件時間為依據

在flink的流式處理中,絕大部分的業務都會使用event time,一般只在event time無法使用時,才會被迫使用processing time或者ingestion time。 如果要使用event time,那麼需要引入event time的時間屬性,引入方式如下所示:

val env = streamexecutionenvironment.getexecutionenvironment

// 從呼叫時刻開始給env建立的每乙個stream追加時間特徵

env.

setstreamtimecharacteristic

(timecharacteristic.eventtime)

流處理從事件產生,到流經source,再到operator,中間是有乙個過程和時間的,雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、分布式等原因,導致亂序的產生,所謂亂序,就是指flink接收到的事件的先後順序不是嚴格按照事件的event time順序排列的。

watermark是一種衡量event time進展的機制,它是資料本身的乙個隱藏屬性,資料本身攜帶著對應的watermark。

watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。

資料流中的watermark用於表示timestamp小於watermark的資料,都已經到達了,因此,window的執行也是由watermark觸發的。

watermark可以理解成乙個延遲觸發機制,我們可以設定watermark的延時時長t,每次系統會校驗已經到達的資料中最大的maxeventtime,然後認定event time小於maxeventtime - t的所有資料都已經到達,如果有視窗的停止時間等於maxeventtime – t,那麼這個視窗被觸發執行。

assignerwithperiodicwatermarks

assignerwithpunctuatedwatermarks

以上兩個介面都繼承自timestampassigner。

assignerwithperiodicwatermark是週期性的產生水銀,預設時間是200毫秒,可以通過引數來設定

// 每隔5秒產生乙個水印

env.getconfig.

setautowatermarkinterval

(5000)

eg:週期性的時間戳抽取

class

periodicassigner

extends

assignerwithperiodicwatermarks

[sensorreading]

override def extracttimestamp

(r: sensorreading, previousts: long)=}

//這種情況設定延時以後的產生的水位線

如果我們事先得知資料流的時間戳是單調遞增的,也就是說沒有亂序。我們可以使用assignascendingtimestamps,方法會直接使用資料的時間戳生成水印。

val stream: datastream[sensorreading]=.

..val withtimestampsandwatermarks = stream

.assignascendingtimestamps

(e =

> e.timestamp)

如果能夠大致推算出資料的中時間的最大延遲時間可以使用:

val stream: datastream[sensorreading]=.

..val withtimestampsandwatermarks = stream.

assigntimestampsandwatermarks

(new

sensortimeassigner

)class

sensortimeassigner

extends

boundedoutofordernesstimestampextractor

[sensorreading]

(time.

seconds(5

))

assignerwithpunctuatedwatermarks 是間歇性的產生水位線 比如我們可以只對某乙個key的資料產生水位線:

直接上**,只給sensor_1的感測器的資料流插入水印

class

punctuatedassigner

extends

assignerwithpunctuatedwatermarks

[sensorreading]

else

}//或事件時間的方式

override def extracttimestamp

(r: sensorreading, previousts: long)

: long =

}

水位線的設定需要權衡:

1)對資料處理要求嚴格就需要得到水位線之前的所有資料,必然需要增大延遲時間,但帶來的壓力是記憶體中的資料會產生更多的擠壓,造成記憶體壓力

2)設定的延遲時間稍微小一點後可以減少觸發的等待時間,緩解記憶體壓力,但是可能會丟失延遲的資料,但可以通過遲到資料的處理來更新視窗運算的結果

介紹flink對遲到資料的處理(預設是直接捨棄):

Flink流處理的時間視窗

對於流處理系統來說,流入的訊息是無限的,所以對於聚合或是連線等操作,流處理系統需要對流入的訊息進行分段,然後基於每一段資料進行聚合或是連線等操作。訊息的分段即稱為視窗,流處理系統支援的視窗有很多態別,最常見的就是時間視窗,基於時間間隔對訊息進行分段處理。本節主要介紹flink流處理系統支援的各種時間...

Flink的滾動視窗 會話視窗 滑動視窗及其應用

flink作業中的視窗 是指一種對無限資料流設定有限資料集,從而實現了處理無線資料流的機制。視窗本身只是個劃分資料集的依據,它並不儲存資料。當我們需要在時間視窗維度上對資料進行聚合時,視窗是流處理應用中經常需要解決的問題。flink的視窗運算元為我們提供了方便易用的api,我們可以將資料流切分成乙個...

Flink流式計算裡的時間和watermark機制

流計算 是相對於 批計算 來的,mapreduce,spark底層的計算方式是目前主流的 批計算 實現方式,很多公司在使用這種方式做大資料處理。但是越來越多的公司目前開始關注 流計算 主要有以下一些原因 1 對處理時間的要求。隨著技術的進步,使用者對 延遲 的忍受能力越來越弱,能更及時發現問題 解決...