flink cep簡單理解就是使用**中的定義的規則去匹配流式資料,找出能成功匹配的資料先理一下flink cep的**流程
先定義pattern
pattern.begin[x]("start").where(...).next("middle").where(...)
通過cep.pattern()方法將datastream轉化為patternstreamval cepresult: patternstream[event] = cep.pattern(inputdatastream, pattern)
將符合pattern的資料呼叫select方法對資料進行處理
cepresult.select(new patternselectfunction[x, string]
}
具體的**示例object cepfun01
println("water-mark:", checkandgetnextwatermark(element, 0l))
timestamp
}override def checkandgetnextwatermark(lastelement: (event, long), extractedtimestamp: long): watermark =
new watermark(lastemittedwatermark)
}}).map(_._1)
/*** 1、首先要定義pattern,start的條件為id=3,next的條件為score>=3,結束條件為score>=5
* 意思是只要符合以id為3開頭,並且接下來的第一條資料的score大於等於3,第二條資料大於等於5即滿足pattern
*/val pattern = pattern.begin[event]("start").where(event => event.id == 3)
.next("middle").where(event => event.score >= 3)
.followedby("end").where(event => event.score >= 5)
/*** 2、通過cep.pattern()方法將datastream轉化為patternstream
*/val cepresult: patternstream[event] = cep.pattern(input, pattern)
input.print()
/*** 3、將符合pattern的資料呼叫select方法對資料進行處理
*/cepresult.select(new patternselectfunction[event, string]
res}
}).print()
senv.execute(this.getclass.getname)
}}// 使用了lombok依賴,方便列印的時候檢視具體資料
@data
case class event(id: int, name: string, score: double)
執行上述**,會得到如下輸出:
start:【event(3,third,3.0)】 ->middle: 【event(4,forth,4.0)】 ->end: 【event(6,fifth,6.0)】
按照event-time進行cep匹配,id-5的資料為遲到的資料,所以是:3-4-6
具體**可在github上檢視:github**位址
對於物件查詢
結論 對於物件查詢 1 使用list的時候會將物件全部取出,而使用iterate則只先將物件主鍵取出,然後在使用的時候再乙個個取出。2 list第二使用的時候會繼續重新資料庫中取出,而iterate則會先成快取中查詢,如果沒找到再去資料庫中取出。對於屬性查詢 條件 查詢快取關閉 兩者沒什麼差別,根據...
對於物件查詢
對於物件查詢 1 使用list的時候會將物件全部取出,而使用iterate則只先將物件主鍵取出,然後在使用的時候再乙個個取出。2 list第二使用的時候會繼續重新資料庫中取出,而iterate則會先成快取中查詢,如果沒找到再去資料庫中取出。對於屬性查詢 條件 查詢快取關閉 兩者沒什麼差別,根據查詢物...
對於order by子句
order by子句指定排序順序 select username from user order by username 依據username的字母順序對於查詢出來的username進行排序,預設是公升序 a z 也可以進行降序排序,必須指定desc關鍵字 在上面的sql語句變為 select us...