flink學習(3) 流處理API

2021-10-07 04:50:28 字數 2583 閱讀 9901

environment

getexecutionenvironment

建立乙個執行環境,表示當前執行程式的上下文

批處理使用executionenvironment呼叫  流處理使用streamexecutionenvironment呼叫

執行環境的 變數  可以通過 setparalleism設定全域性並行度 如果不設定 會以flink-conf。yaml中的配置為準 預設為1

datastream 基本轉換

一般先定義輸入資料的樣例類

case class inputdemo(id:string ,timestamp:long,temperature:double)

從集合中取資料 fromcollection

val stream1:datastream[inputdemo] = env.fromcollection(list(

inputdemo("1",121,35.6),

inputdemo("2",121,31.6)

inputdemo("3",123211,35.64)

inputdemo("4",12311,38.6)

列印輸出api print  執行計算api   execute  

從檔案中讀資料 readtextfile

socket文字流 souckettextstream

從kafka讀取資料 需要引入 flink-connector-kafka-011_2.11詳見 部落格 flink對接kafka

此時需要addsource方法 新增資料來源 引數需要傳遞乙個 方法 實現sourcefunction這個介面

連線kafka時 直接 在addsource方法 中

new flinkkafkaconsumer[型別](topic 名字 ,  反序列化 schema 如string型別 使用 new ******stringschema() ,kafka props)

同樣可以自定義source

定義乙個class  實現sourcefunction

class mysource() extends sourcefunction[inputdemo])

val onestream = splitstream.select("1")

val twostream = splitstream.select("2")

val allstream = splitstream.select("1","2")

//分別輸出 data.a >1的資料分別輸出 data.a <1的資料 以及全部資料合流 

connect() 可以搭配colmap colflatmap等方法 實現不同的流不同的處理方式  使用connect後 map 方法 實際上就是colmap 

val warningstream = onestream.map(

date => (date.id,data.temperature,"aaa")

)val connectstream:connectedstreamswar:[(string,double,string),inputdemo] warningstream.connect(twostream)

//此時connectstream呼叫map falatmap 等運算元實際上是呼叫colmap 需要傳兩個函式

val resultstream:datastream[object] = connectedstreams.map(

warningdata =>(warningdata._1,warningdata._2,"aaa"),

lowtemdata => (lowtemdata.id,"bb")

)//其實本質上 是對合流的兩個分別做處理 用分流和合流 組合 可以完美實現 if else分支 並實時處理

多流轉換運算元 split-select connect-comap/coflatmap 成對出現 先轉換成 splitstream,connectedstreams然後再通過select、 comap轉換回datastream 所謂comap 其實就是基於connectedstreams的map方法

union

單純的把兩個流合在一起 但是不能做分別處理

富函式(richfunction)

是datastream api提供的乙個函式類的介面,所有flink函式都有其rich版本 ,它與常規函式的不同在於,可以獲取執行環境的上下文,並擁有一些生命週期方法,所以可以用來實現更加複雜的功能

rich function 有乙個生命週期的概念

典型的有 open()是richfunction的初始化方法 當乙個運算元被呼叫前 open會被呼叫

close() 方法是生命週期最後乙個方法,做一些清理工作

getruntimecontext()方法提供了函式的runtimecontext的資訊 如執行的並行度,任務名 state狀態

復函式 因為可以獲取執行時上下文 在執行時 上下文 可以對state進行操作 所以flink的有狀態流式計算,做狀態程式設計 就是基於richfunction的

Flink 流處理WordCount 示例

然後開啟cmd視窗使用 使用命令 nc lp 8888即可開啟監聽 8888 埠號。如下圖 進行分組聚合 keyby 將key相同的分到乙個組中 singleoutputstreamoperator resultdatastream wordandone.keyby 0 sum 1 transfor...

Flink學習筆記1 Flink框架api介紹

1.獲得 execution 環境 getexecutionenvironment createlocalenvironment createremoteenvironment string host,int port,string.jarfiles 批處理示例 executionenvironme...

Flink學習筆記2 Flink框架api介紹

使用 transform 函式 mapfunction 介面 其中泛型的第一 string 代表輸入型別,第二個 integer 代表輸出型別 class mymapfunction implements mapfunction data.map newmymapfunction lambda表示式...