什麼是側輸出
在flink處理資料流時,我們經常會遇到這樣的情況:在處理乙個資料來源時,往往需要將該源中的不同型別的資料做分割處理,如果使用filter運算元對資料來源進行篩選分割的話,勢必會造成資料流的多次複製,造成不必要的效能浪費;flink中的側輸出就是將資料流進行分割,而不對流進行複製的一種分流機制。flink的側輸出的另乙個作用就是對延時遲到的資料進行處理,這樣就可以不必丟棄遲到的資料。
側輸出案例
我們結合實際案例說明一下flink側輸出的用法,假設我們的需求是實時的從kafka接收生產資料,我們需要對遲到超過一定時長的資料另行處理:
定義資料類
case class kafkadata(
id: string, //id
eventtime: long // 時間時間
) object kafkadata
}
flink流處理程式
val env = streamexecutionenvironment.getexecutionenvironment
// 從kafka獲取資料流
val kafkaconsumer = kafkaconsumersource.getcomsumer(
load.getstring("kafkabootstrap"),
"test",
load.getstring("kafkagroupid_weather"),
new ******stringschema
)kafkaconsumer.setstartfromlatest()
第一步:定義outputtag
// 定義 outputtag 側輸出的資料格式可以不應和主流的資料格式一樣
val delayoutputtag = outputtag[string]("delay-side-output")
第二步:使用特定的函式將資料傳送到側輸出
使用側輸出時需要使用特定的函式傳送資料,具體可以使用一下函式:
val datasource = env.addsource(kafkaconsumer)
//使用特定的函式將資料傳送到側輸出
.process(new processfunction[kafkadata, kafkadata] else 遲到了 :" + value.delaytime + "秒")}}
})// 常規資料處理
datasource.print()
// 對側輸出的資料處理
datasource.getsideoutput(delayoutputtag).print()
env.execute("side outputs test")
}
我們往kafka中寫入兩條資料
1,1560522012
1,1560522698
程式輸出結果:對不同分割源的資料做了不同的處理
4> 資料 kafkadata(1,1560522012) 遲到了 :697秒
3> kafkadata(1,1560522698)
flink中也提供了專對遲到資料側輸出的方法:sideoutputlatedata
使用方式:
val delayoutputtag = outputtag[t]("delay-side-output")
val maindata =input
.keyby()
.window()
.sideoutputlatedata(delayoutputtag)
val delaydatastream = ,maindata.getsideoutput(delayoutputtag)
參考資料
Flink側輸出流(Side Output)
需求 如果溫度值小於32f,就將報警資訊輸出到側輸出流中 package com.run.wc import org.apache.flink.streaming.api.scala.import org.apache.flink.api.scala.import org.apache.flink....
Flink 側輸出流(SideOutput)
要求 將小於32 輸入到側輸入流,大於32 輸入到主流 import org.apache.flink.api.scala.import org.apache.flink.streaming.api.timecharacteristic import org.apache.flink.streami...
flink的側輸出流
在 flink 處理資料流時,我們經常會遇到這樣的情況 在處理乙個資料來源時,往往需要將該源中的不同型別的資料做分割處理,如果使用 filter 運算元對資料來源進行篩選分割的話,勢必會造成資料流的多次複製,造成不必要的效能浪費 flink 中的側輸出就是將資料流進行分割,而不對流進行複製的一種分流...