觸發器定義了window何時會被求值以及何時傳送求值結果。觸發器可以到了特定的時間觸發也可以碰到特定的事件觸發。例如:觀察到事件數量符合一定條件或者觀察到了特定的事件。
觸發器可以訪問流的時間屬性以及定時器,還可以對state狀態程式設計。所以觸發器和process function一樣強大。例如我們可以實現乙個觸發邏輯:當視窗接收到一定數量的元素時,觸發器觸發。再比如當視窗接收到乙個特定元素時,觸發器觸發。還有就是當視窗接收到的元素裡面包含特定模式(5秒鐘內接收到了兩個同樣型別的事件),觸發器也可以觸發。在乙個事件時間的視窗中,乙個自定義的觸發器可以提前(在水位線沒過視窗結束時間之前)計算和發射計算結果。這是乙個常見的低延遲計算策略,儘管計算不完全,但不像預設的那樣需要等待水位線沒過視窗結束時間。
每次呼叫觸發器都會產生乙個triggerresult來決定視窗接下來發生什麼。triggerresult可以取以下結果:
continue:什麼都不做
fire:如果window operator有processwindowfunction這個引數,將會呼叫這個processwindowfunction。如果視窗僅有增量聚合函式(reducefunction或者aggregatefunction)作為引數,那麼當前的聚合結果將會被傳送。視窗的state不變。
purge:視窗所有內容包括視窗的元資料都將被丟棄。
fire_and_purge:先對視窗進行求值,再將視窗中的內容丟棄。
triggerresult可能的取值使得我們可以實現很複雜的視窗邏輯。乙個自定義觸發器可以觸發多次,可以計算或者更新結果,可以在傳送結果之前清空視窗。
接下來我們看一下trigger api:
public
abstract
class
trigger
window
>
implements
serializable
public
inte***ce
triggercontext
public
inte***ce
onmergecontext
extends
triggercontext
這裡要注意兩個地方:清空state和merging合併觸發器。
當在觸發器中使用per-window state時,這裡我們需要保證當視窗被刪除時state也要被刪除,否則隨著時間的推移,window operator將會積累越來越多的資料,最終可能使應用崩潰。
當視窗被刪除時,為了清空所有狀態,觸發器的clear()方法需要需要刪掉所有的自定義per-window state,以及使用triggercontext物件將處理時間和事件時間的定時器都刪除。
下面的例子展示了乙個觸發器在視窗結束時間之前觸發。當第乙個事件被分配到視窗時,這個觸發器註冊了乙個定時器,定時時間為水位線之前一秒鐘。當定時事件執行,將會註冊乙個新的定時事件,這樣,這個觸發器每秒鐘最多觸發一次。
自定義實現: 通過觸發器在15秒的視窗內每秒觸發一次計算
import com.atguigu.streamingjob.
import org.apache.flink.api.common.state.
import org.apache.flink.api.scala.typeutils.types
import org.apache.flink.streaming.api.timecharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.processwindowfunction
import org.apache.flink.streaming.api.windowing.time.time
import org.apache.flink.streaming.api.windowing.triggers.
import org.apache.flink.streaming.api.windowing.windows.timewindow
import org.apache.flink.util.collector
case
class
sensorreading
(id:string,timestamp :long,temperature : double)
object triggerstest
//建立視窗的全量函式 in out key windom
class
allwindom
extends
processwindowfunction
[sensorreading,
(string,double,double,long)
,string,timewindow]}}
//設定一秒鐘一次的觸發器
class
onesecondintervaltrigger
extends
trigger
[sensorreading , timewindow]
else
}//支觸發計算
triggerresult.fire
}//這是系統時間的 ,不執行業務邏輯
override def onprocessingtime
(l: long,
w: timewindow,
triggercontext: trigger.triggercontext)
: triggerresult =
//每個視窗的結束時呼叫
override def clear
(w: timewindow,
triggercontext: trigger.triggercontext)
: unit =
//每個資料呼叫一次
override def onelement
(t: sensorreading,
l: long,
w: timewindow,
triggercontext: trigger.triggercontext)
: triggerresult =
triggerresult.continue
}
SQL Server 建立觸發器(trigger)
觸發器簡介 觸發器是一種特殊的儲存過程,它的執行不是由程式呼叫,也不是手動執行,而是由事件來觸發。觸發器是當對某乙個表進行操作。例如 update insert delete這些操作的時候,系統會自動呼叫執行該錶上對應的觸發器。觸發器分類 1 dml 資料操縱語言 data manipulation...
oracle資料庫觸發器使用 trigger
1.更新前觸發,不允許週日修改 create or replace trigger auth secure before insert or update or delete on tb emp begin if to char sysdate,dy 星期日 then end if end 2.自增...
觸發器 mysql觸發器
觸發器是一種特殊的儲存過程,它在插入 刪除或修改特定表中的資料時觸發執行,它比資料庫本身標準的功能有更精細和更複雜的資料控制能力。和儲存過程一樣,很少使用。1 觸發器的作用 2 建立觸發器 建立測試環境 mysql create database test db query ok,1 row aff...