Flink學習筆記(五) flink資料合流

2021-10-23 11:10:38 字數 2798 閱讀 9386

上一章記錄了flink的分流操作,那麼有分流是不是應該有合流呢?

當然是有這樣的操作啦

stream1和stream2流需要合併為stream流

1. union合流

2. connect合流

前置配置

streamexecutionenvironment env = streamexecutionenvironment.

getexecutionenvironment()

;env.

setstreamtimecharacteristic

(timecharacteristic.eventtime)

;env.

setparallelism(1

);//kafka基本配置,0.8版本

properties properties =

newproperties()

;properties.

setproperty

("zookeeper.connect"

,"zk位址");

properties.

put(

"bootstrap.servers"

,"kafka位址");

properties.

put(

"group.id"

,"groupid");

properties.

put(

"key.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer");

properties.

put(

"value.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer");

properties.

put(

"auto.offset.reset"

,"earliest");

//獲取日誌

flinkkafkaconsumer08

kafkasource1 =

newflinkkafkaconsumer08

<

>

("topic1"

,new

******stringschema()

, properties)

;//獲取flink資料流

datastream

logsource1 = env.

addsource

(kafkasource)

;flinkkafkaconsumer08

kafkasource2 =

newflinkkafkaconsumer08

<

>

("topic2"

,new

******stringschema()

, properties)

;//獲取flink資料流

datastream

logsource2 = env.

addsource

(kafkasource)

;datastream

datastream1 = logsource1.

map(

newmapfunction

()})

;datastream

datastream2 = logsource2.

map(

newmapfunction

()})

;

union合流

注意:union合流只能合併相同型別的流

datastream

union = datastream1.

union

(datastream2)

;

connect合流

注意:connect合流可以合併不同型別的流

connectedstreams

connect = datastream1.

connect

(datastream2)

;

那麼合併好的流如何處理呢,如下所示,將兩個流轉換為乙個流進行處理

//方式一:用comap處理合併後的流

singleoutputstreamoperator

result = connect.

map(

newcomapfunction

()//定義第二個流的處理邏輯

@override

public string map2

(object2 object2)

throws exception })

;//方式二:用coflatmap處理合併後的流

singleoutputstreamoperator

result = connect.

flatmap

(new

coflatmapfunction

()@override

public

void

flatmap2

(object1 object2, collector

collector)

throws exception })

;

flink學習 flink架構

flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...

Flink學習筆記1 Flink框架api介紹

1.獲得 execution 環境 getexecutionenvironment createlocalenvironment createremoteenvironment string host,int port,string.jarfiles 批處理示例 executionenvironme...

Flink學習筆記2 Flink框架api介紹

使用 transform 函式 mapfunction 介面 其中泛型的第一 string 代表輸入型別,第二個 integer 代表輸出型別 class mymapfunction implements mapfunction data.map newmymapfunction lambda表示式...