datasource型別運算元
val environment: executionenvironment = executionenvironment.getexecutionenvironment
// 支援多種collection的具體型別
val datasource1: dataset[
string
]= environment.fromcollection(array(
"a",
"b",
"c",
"d")
)// 支援tuple,自定義物件等復合形式
val datasource2: dataset[
string
]= environment.fromelements(
"a",
"b",
"c",
"d")
// 基於迭代的sequence的dataset
val datasource3: dataset[
long
]= environment.generatesequence(1,
20)// 基於本地檔案的讀取
val datasource4: dataset[
string
]= environment.readtextfile(
"word.txt"
)// 讀取hdfs檔案
val datasource5: dataset[
string
]= environment.readtextfile(
"hdfs://node01:9000/word.txt"
)// 讀取csv檔案,可以直接指定泛型
class wordcount(word:
string
,num:
int)
val datasource6: dataset[wordcount]
= environment.readcsvfile[wordcount]
("word.csv"
)// 讀取壓縮檔案,flink會自動識別出壓縮型別,使用對應的方式進行解壓
val datasource7 = environment.readtextfile(
"word.tar.gz"
)
flink批處理transformation運算元
ransformation
說明map
將dataset中的每乙個元素轉換為另外乙個元素
flatmap
將dataset中的每乙個元素轉換為0…n個元素
將乙個分割槽中的元素轉換為另乙個元素
filter
過濾出來一些符合條件的元素
reduce
可以對乙個dataset或者乙個group來進行聚合計算,最終聚合成乙個元素
reducegroup
將乙個dataset或者乙個group聚合成乙個或多個元素
aggregate
按照內建的方式來進行聚合。例如:sum/min/max…
distinct
去重join
將兩個dataset按照一定條件連線到一起,形成新的dataset
union
將兩個dataset取並集,並不會去重
rebalance
讓每個分割槽的資料均勻分布,避免資料傾斜
partitionbyhash
按照指定的key進行hash分割槽
sortpartition
指定欄位對分割槽中的資料進行排序
sum運算元
def main(args: array[
string])
:unit
=
reduce運算元
def main(args: array[
string])
:unit
=
aggregate
def main(args: array[
string])
:unit
=
累加器accumulator
def main(args: array[
string])
:unit
=override
def map(in:
string):
string=}
) res.writeastext(
"data/wordcount"
,filesystem.writemode.overwrite)
val result: jobexecutionresult = environment.execute(
"wordcount"
) println(result.getaccumulatorresult(
"wordcounter"))
}
廣播變數broadcast
def main(args: array[
string])
:unit
=override
def open(parameters: configuration)
:unit=}
).withbroadcastset(score,
"score"
).print(
)}
分布式快取檔案
def main(args: array[
string])
:unit=}
override
def map(in:
string):
string=}
) result.print(
)}
Flink學習系列之二 Flink批處理
此時我們可以使用flink的批處理,我的data目錄下有a.txt檔案,輸入任意的單詞,然後我們開始統計。如下 public class batchhandler groupby 0 sum 1 filepath 檔案輸出結果檔案 n 以換行符作為每行結束條件 以空格分割單詞 setparallel...
flink批處理中的source以及sink介紹
flink在批處理中常見的source主要有兩大類 1.基於本地集合的source collection based source 2.基於檔案的source file based source 1.基於本地集合的source 在flink最常見的建立dataset方式有三種。1.使用env.fro...
flink學習 flink架構
flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...