Flink專案開發記錄之合併資料流

2021-10-01 06:37:09 字數 1286 閱讀 6352

專案目前的jar包環境:flink1.9.0

這邊就是常用jar,就不列出了,合併流我這邊選擇是使用相同的key流然後設定時間的上下限,進行合流

專案背景:由於業務需求,將原本的從kafka接收的資料流,經過業務a處理,變成了a,b兩個datastream,a、b兩個流分別經過若干個業務處理後,需要合併到乙個流中接著經過業務b、c、d等處理在sink到es中,所以上flink官網學習了一下合流的過程記錄下來

**如下:

val devicestreamcombind = stadata1.map(x=>)//需要合併的副流

//stationdatabefore2主流

val stationdatabefore3 = stationdatabefore2.keyby(_._1).intervaljoin(devicestreamcombind.keyby(_._1)).between(org.apache.flink.streaming.api.windowing.time.time.seconds(-5),org.apache.flink.streaming.api.windowing.time.time.seconds(5))

.process(new processjoinfunction[(int,array[stationdata]),(int,array[stationdata]),(array[stationdata])]

}).flatmap(new richflatmapfunction[array[stationdata],stationdata] )}})

.filter(_ != null)

intervaljoin方法必須是在keyedstream下才能呼叫,所以人為的將key設定為-1,而之後的between是必須呼叫的,它的作用是將主流的時間進行乙個範圍,分別加上上下限(這也就是為什麼下限要為負數),可以通過intervaljoinoperator下的processelement方法進行檢視

這邊需要設定乙個執行環境的事件時間

如果不設定此時間的話,record.gettimestamp為負數將會報錯(如果你沒有認為的設定水印,flink將以當前系統的時間自動獲取為timestamp),設定為事件時間型別後,才可執行(原始碼中有判斷當前的時間型別是否為事件時間,合流必須使用事件時間)

之後的便是常規的new processjoinfunction類,類的型別只要按照提示輸入流1和流2的型別以及指定最後的輸出型別即可

Flink 雙流合併之connect Demo2

1 主類 package towstream program demo description author yang create 2020 12 31 11 39 import org.apache.flink.api.common.state.import org.apache.flink.a...

Flink 雙流合併之connect Demo1

1 主類 package towstream program demo description author yang create 2020 12 31 11 39 import org.apache.flink.api.common.state.import org.apache.flink.a...

專案開發記錄

有表的情況 insert into 表 val1,val2 select val1,val2 from 表 沒表的情況 select into 表 from 表 更新update 表 set urlid kd.id from keywords kd where kd 2 c 批量導數 採用.net ...