Flink 側輸出流 SideOutput

2022-01-26 12:31:16 字數 1202 閱讀 6372

大部分的 datastream api 的運算元的輸出是單一輸出,也就是某種資料型別的流。除了 split 運算元,可以將一條流分成多條流,這些流的資料型別也都相同。processfunction 的 side outputs 功能可以產生多條流,並且這些流的資料型別可以不一樣。乙個 sideoutput 可以定義為 outputtag[x]物件,x 是輸出流的資料型別。processfunction 可以通過 context 物件發射乙個事件到乙個或者多個 side outputs。

下面的**演示了低於32f的溫度資訊進入到測輸出流"freezing alert"中。

object sideoutputtest )

.assigntimestampsandwatermarks(new boundedoutofordernesstimestampextractor[sensorreading](time.seconds(1)) )

//低溫報警處理

val processstream = datastream.process(new freezingalert)

//列印主輸出流

processstream.print("process stream")

//列印側輸出流。先得到某個測輸出流。

processstream.getsideoutput(new outputtag[string]("freezing alert")).print("freezing alert")

env.execute("window test")

}}class freezingalert extends processfunction[sensorreading, sensorreading] else

}}

埠資料

[atguigu@hadoop102 ~]$ nc -lk 7777

sensor_1, 1547718200, 30

sensor_1, 1547718200, 25

sensor_1, 1547718200, 35

控制台列印

freezing alert> freezing alert for 30.0

freezing alert> freezing alert for 25.0

process stream> sensorreading(sensor_1,1547718200,35.0)

Flink側輸出流(Side Output)

需求 如果溫度值小於32f,就將報警資訊輸出到側輸出流中 package com.run.wc import org.apache.flink.streaming.api.scala.import org.apache.flink.api.scala.import org.apache.flink....

Flink 側輸出流(SideOutput)

要求 將小於32 輸入到側輸入流,大於32 輸入到主流 import org.apache.flink.api.scala.import org.apache.flink.streaming.api.timecharacteristic import org.apache.flink.streami...

flink的側輸出流

在 flink 處理資料流時,我們經常會遇到這樣的情況 在處理乙個資料來源時,往往需要將該源中的不同型別的資料做分割處理,如果使用 filter 運算元對資料來源進行篩選分割的話,勢必會造成資料流的多次複製,造成不必要的效能浪費 flink 中的側輸出就是將資料流進行分割,而不對流進行複製的一種分流...