Flink 流處理之source簡介

2021-10-06 09:02:53 字數 2189 閱讀 3227

1 從集合讀取資料

// 定義樣例類:水位感測器:用於接收空高資料

// id:感測器編號

// ts:時間戳

// vc:空高

case class watersensor(id:string, ts:long, vc:double)

object source_collection

}

2 從檔案讀取資料

val env: streamexecutionenvironment =

streamexecutionenvironment.getexecutionenvironment

val fileds: datastream[string] = env.readtextfile(

"input/data.txt"

)fileds.print(

)env.execute(

"sensor"

)

3 以kafka訊息佇列的資料作為**

引入kafka聯結器的依賴:

org.apache.flink

flink-connector-kafka-0.11_2.11

1.7.2

**實現:

org.apache.flink

flink-connector-kafka-0.11_2.11

1.7.2

**實現:

val env: streamexecutionenvironment = streamexecutionenvironment.getexecutionenvironment

val properties = new properties(

)properties.setproperty(

"bootstrap.servers"

,"hadoop1:9092"

)properties.setproperty(

"group.id"

,"consumer-group"

)properties.setproperty(

"key.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer"

)properties.setproperty(

"value.deserializer"

,"org.apache.kafka.common.serialization.stringdeserializer"

)properties.setproperty(

"auto.offset.reset"

,"latest"

)val kafkads: datastream[string] = env.addsource(

new flinkkafkaconsumer011[string]

("sensor"

, new ******stringschema(

), properties)

)kafkads.print(

)env.execute(

"sensor"

)

4 自定義source
val env: streamexecutionenvironment =

streamexecutionenvironment.getexecutionenvironment

val mysensords: datastream[watersensor] = env.addsource(

new mysensorsource())

mysensords.print(

)env.execute(

"sensor"

)

mysensorsource具體的**實現如下:

class mysensorsource extends sourcefunction[watersensor]

} override def cancel(

): unit =

}

Flink流處理之迭代案例

我們在學習flink時一般都離不開flink官網,而我們通常都要先學會 example apche的開源專案一般都會有這個目錄,今天就說一下flink的example中的流處理的迭代 官網流處理的迭代位址 首先。基於輸入流構建iterativestream。這是乙個迭代的起始。通常稱之為迭代頭 it...

flink批處理中的source以及sink介紹

flink在批處理中常見的source主要有兩大類 1.基於本地集合的source collection based source 2.基於檔案的source file based source 1.基於本地集合的source 在flink最常見的建立dataset方式有三種。1.使用env.fro...

Flink 流處理WordCount 示例

然後開啟cmd視窗使用 使用命令 nc lp 8888即可開啟監聽 8888 埠號。如下圖 進行分組聚合 keyby 將key相同的分到乙個組中 singleoutputstreamoperator resultdatastream wordandone.keyby 0 sum 1 transfor...