flink自定義trigger詳解

2021-10-22 19:26:37 字數 3120 閱讀 1452

[1]中有句話是這樣的:

"其實,我們要實現基於事件時間的視窗隨意輸出,比如1000個元素觸發一次輸出,那麼我們就可以通過修改這個觸發器來實現。"

這句話的意思是,預設的自帶的trigger一般是基於eventtime的。

那麼這1000 個元素可能跨度是一小時,也可能跨度是兩小時,對吧

但是顯然預設的trigger只能是盯著eventtime(時間戳)來決定是否觸發計算,並不能根據元素個數進行觸發。

也就是說,預設的trigger盯著的跨度是"時間差"。而不是"個數差"

講人話就是:

①例如flink的trigger預設每隔一天輸出統計資料,

②但是不支援預設每隔一千個訂單輸出統計資料。

但是注意這裡的一千個統計資料可能超過一天,甚至超過一周,耗時可能不固定。

因為你想啊,**都是要把邏輯寫死的對吧?

一千個訂單可能一開始耗時一周,後來耗時乙個月。那程式要怎麼根據變化的時間來鎖定一千個訂單觸發一次?

顯然做不到,這個時候我們就希望鎖定"個數間隔"、「個數差」,這個時候就需要自定義trigger

下面是官方文件[4]中triggers這一節的內容概括

需要override的函式

函式作用

onelement()資料(element)被加入window的時候會呼叫該函式

oneventtime()當乙個註冊的event-time定時器觸發

onprocessingtime()當乙個註冊的processing-time定時器觸發

onmerge()與有狀態觸發器(stateful triggers)和當兩個視窗整合的時候整合(merge)狀態相關。

例如使用session windows

clear()window清理資料需要

前面三個用來設定呼叫事件(invocation event)以後如何操作,

所以這些"操作"必須是乙個triggerresult

也就是說,前三個函式返回的triggerresult可以是下面幾種選擇:

返回的triggerresult作用

continue什麼都不做

fire觸發計算

purge刪除視窗中的所有資料

fire_and_purg觸發計算後刪除視窗中所有資料

然後是fire and purge這一節的內容:

觸發計算時,返回的一定是fire或者fire_and_purg(這個話僅僅是來自官方文件的翻譯,其實intellij提示的選項並不僅僅是上面幾個)

具體示範**參考[5]即可

private static logger log = logge***ctory.getlogger(counttriggerwithtimeout.class);

/*** 視窗最大資料量

*/private int maxcount;

/*** event time / process time

*/private timecharacteristic timetype;

/*** 用於儲存視窗當前資料量的狀態物件

*/private reducingstatedescriptorcountstatedescriptor =

new reducingstatedescriptor("counter", new sum(), longserializer.instance);

public counttriggerwithtimeout(int maxcount, timecharacteristic timetype)

private triggerresult fireandpurge(timewindow window, triggercontext ctx) throws exception

@override

public triggerresult onelement(t element, long timestamp, timewindow window, triggercontext ctx) throws exception

if (timestamp >= window.getend()) else

}@override

public triggerresult onprocessingtime(long time, timewindow window, triggercontext ctx) throws exception

if (time >= window.getend()) else

}@override

public triggerresult oneventtime(long time, timewindow window, triggercontext ctx) throws exception

if (time >= window.getend()) else

}@override

public void clear(timewindow window, triggercontext ctx) throws exception

/*** 計數方法

*/class sum implements reducefunction

}}

reference:

[1]flink自定義trigger-實現視窗隨意輸出

[2]flink 自定義trigger

[3]flink 自定義trigger

[4]flink官方文件-視窗

[5]flink 自定義觸發器

flink寫入kafka之自定義分割槽器

直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章 接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器 直接上 自定義分割槽 suppresswarnings unchecke...

flink 自定義單並行度的source源

自己定義乙個單並行度的source,需要自己實現乙個sourcefunction介面 import org.apache.flink.streaming.api.functions.source.sourcefunction 自己定義乙個單並行度的source 需要自己實現乙個sourcefunct...

自定義 如何自定義協議

何為自定義協議,其實是相對標準協議來說的,這裡主要針對的是應用層協議 常見的標準的應用層協議如http ftp smtp等,如果我們在網路通訊的過程中不去使用這些標準協議,那就需要自定義協議,比如我們常用的rpc框架 dubbo,thrift 分布式快取 redis,memcached 等都是自定義...