上一章記錄了flink的分流操作,那麼有分流是不是應該有合流呢?
當然是有這樣的操作啦
stream1和stream2流需要合併為stream流
1. union合流
2. connect合流
前置配置
streamexecutionenvironment env = streamexecutionenvironment.
getexecutionenvironment()
;env.
setstreamtimecharacteristic
(timecharacteristic.eventtime)
;env.
setparallelism(1
);//kafka基本配置,0.8版本
properties properties =
newproperties()
;properties.
setproperty
("zookeeper.connect"
,"zk位址");
properties.
put(
"bootstrap.servers"
,"kafka位址");
properties.
put(
"group.id"
,"groupid");
properties.
put(
"key.deserializer"
,"org.apache.kafka.common.serialization.stringdeserializer");
properties.
put(
"value.deserializer"
,"org.apache.kafka.common.serialization.stringdeserializer");
properties.
put(
"auto.offset.reset"
,"earliest");
//獲取日誌
flinkkafkaconsumer08
kafkasource1 =
newflinkkafkaconsumer08
<
>
("topic1"
,new
******stringschema()
, properties)
;//獲取flink資料流
datastream
logsource1 = env.
addsource
(kafkasource)
;flinkkafkaconsumer08
kafkasource2 =
newflinkkafkaconsumer08
<
>
("topic2"
,new
******stringschema()
, properties)
;//獲取flink資料流
datastream
logsource2 = env.
addsource
(kafkasource)
;datastream
datastream1 = logsource1.
map(
newmapfunction
()})
;datastream
datastream2 = logsource2.
map(
newmapfunction
()})
;
union合流
注意:union合流只能合併相同型別的流
datastream
union = datastream1.
union
(datastream2)
;
connect合流
注意:connect合流可以合併不同型別的流
connectedstreams
connect = datastream1.
connect
(datastream2)
;
那麼合併好的流如何處理呢,如下所示,將兩個流轉換為乙個流進行處理
//方式一:用comap處理合併後的流
singleoutputstreamoperator
result = connect.
map(
newcomapfunction
()//定義第二個流的處理邏輯
@override
public string map2
(object2 object2)
throws exception })
;//方式二:用coflatmap處理合併後的流
singleoutputstreamoperator
result = connect.
flatmap
(new
coflatmapfunction
()@override
public
void
flatmap2
(object1 object2, collector
collector)
throws exception })
;
flink學習 flink架構
flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...
Flink學習筆記1 Flink框架api介紹
1.獲得 execution 環境 getexecutionenvironment createlocalenvironment createremoteenvironment string host,int port,string.jarfiles 批處理示例 executionenvironme...
Flink學習筆記2 Flink框架api介紹
使用 transform 函式 mapfunction 介面 其中泛型的第一 string 代表輸入型別,第二個 integer 代表輸出型別 class mymapfunction implements mapfunction data.map newmymapfunction lambda表示式...