需求:如果溫度值小於32f,就將報警資訊輸出到側輸出流中
package com.run.wc
import org.apache.flink.streaming.api.scala.
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.processfunction
import org.apache.flink.streaming.api.functions.timestamps.boundedoutofordernesstimestampextractor
import org.apache.flink.streaming.api.windowing.time.time
import org.apache.flink.util.collector
/** * @author 霄嵩
*//**
* sideoutput側輸出流
* 需求:如果溫度值小於32f,就將報警資訊輸出到側輸出流中
*/object sideoutputtest
) val stream = mapstream.
assigntimestampsandwatermarks
(new
boundedoutofordernesstimestampextractor
[sensor]
(time.
seconds(1
))})
val processstream = stream.
process
(new
sideoutputalert()
) mapstream.
print
("map data"
) processstream.
print
("process data"
)//通過getsideoutput獲取側輸出流,並列印輸出
processstream.
getsideoutput
(new
outputtag
[string]
("freezingalert"))
.print
("side output data"
) env.
execute
("side output test")}
}//溫度值小於32f,就將報警資訊輸出到側輸出流中
class
sideoutputalert()
extends
processfunction
[sensor, sensor]
的溫度值小於32f")}
else}}
result:
process data>
sensor
(sensor_1,
1547718207
,37.0
)map data>
sensor
(sensor_1,
1547718207
,37.0
)process data>
sensor
(sensor_1,
1547718207
,36.0
)map data>
sensor
(sensor_1,
1547718207
,36.0
)side output data> sensor_1的溫度值小於32f
map data>
sensor
(sensor_1,
1547718207
,31.0
)process data>
sensor
(sensor_1,
1547718207
,36.0
)map data>
sensor
(sensor_1,
1547718207
,36.0
)side output data> sensor_1的溫度值小於32f
map data>
sensor
(sensor_1,
1547718207
,20.0
)
Flink 側輸出流(SideOutput)
要求 將小於32 輸入到側輸入流,大於32 輸入到主流 import org.apache.flink.api.scala.import org.apache.flink.streaming.api.timecharacteristic import org.apache.flink.streami...
flink的側輸出流
在 flink 處理資料流時,我們經常會遇到這樣的情況 在處理乙個資料來源時,往往需要將該源中的不同型別的資料做分割處理,如果使用 filter 運算元對資料來源進行篩選分割的話,勢必會造成資料流的多次複製,造成不必要的效能浪費 flink 中的側輸出就是將資料流進行分割,而不對流進行複製的一種分流...
Flink 側輸出流 SideOutput
大部分的 datastream api 的運算元的輸出是單一輸出,也就是某種資料型別的流。除了 split 運算元,可以將一條流分成多條流,這些流的資料型別也都相同。processfunction 的 side outputs 功能可以產生多條流,並且這些流的資料型別可以不一樣。乙個 sideout...