我們在學習flink時一般都離不開flink官網,而我們通常都要先學會「example」,apche的開源專案一般都會有這個目錄,今天就說一下flink的example中的流處理的迭代
官網流處理的迭代位址
首先。基於輸入流構建iterativestream。這是乙個迭代的起始。通常稱之為迭代頭:
iterativestream
iteration = inputstream.
iterate()
;
接著。我們指定一系列的轉換操作用於表述在迭代過程中執行的邏輯(這裡簡單以map轉換作為演示樣例)。map api所接受的udf就是我們上文所說的步函式:
datastream
iteratedstream = iteration.
map(
/* this is executed many times */
);
然後。作為迭代我們肯定須要有資料反饋給迭代頭進行反覆計算,所以我們從迭代過的流中過濾出符合條件的元素組成的部分流,我們稱之為反饋流:
datastream
feedbackstream = iteratedstream.
filter
(/* one part of the stream */
);
將反饋流反饋給迭代頭就意味著乙個迭代的完整邏輯的完畢,那麼它就能夠「關閉」這個閉合的「環」了。通過呼叫iterativestream的closewith這一例項方法能夠關閉乙個迭代(也可表述為定義了迭代尾)。傳遞給closewith的資料流將會反饋給迭代頭:
iteration.
closewith
(feedbackstream)
;
另外,乙個慣用的模式是過濾出須要繼續向前分發的部分流,這個過濾轉換事實上定義的是「終止迭代」的邏輯條件,符合條件的元素將被分發給下游而不用於進行下一次迭代:
datastream
output = iteratedstream.
filter
(/* some other part of the stream */
);
主幹程式**例如以下:
public
static
void
main
(string[
] args)
throws exception
首先,我們先通過source函式建立初始的流物件inputstream:
datastream
> inputstream = env.
addsource
(new
randomfibonaccisource()
);
該source函式會生成二元組序列,二元組的兩個字段值是隨機生成的作為斐波那契數列的初始值:
private
static
class
randomfibonaccisource
implements
sourcefunction
>
}public
void
cancel()
}
為了對新計算的斐波那契數列中的值以及累加的迭代次數進行儲存,我們須要將二元組資料流轉換為五元組資料流,並據此建立迭代物件:
iterativestream
> iterativestream =
inputstream.
map(
newtupletransformmapfunction()
).iterate
(5000
);
元組轉換的map函式實現:
private
static
class
tupletransformmapfunction
extends
richmapfunction
integer>
, tuple5
>
}
上面五元組中,當中索引為0。1這兩個位置的元素,始終都是最初生成的兩個元素不會變化,而後三個欄位都會隨著迭代而變化。在迭代流iterativestream建立完畢之後,我們將基於它執行斐波那契數列的步函式並產生斐波那契數列流fibonaccistream:
datastream
> fibonaccistream =
iterativestream.
map(
newfibonaccicalcstepfunction()
);
這裡的fibonaccistream僅僅是乙個代稱,當中的資料並非真正的斐波那契數列,事實上就是上面那個五元組。當中用於計算斐波那契數列的步函式實現例如以下:
private
static
class
fibonaccicalcstepfunction
extends
richmapfunction
,
tuple5
>
}
每乙個元素計算斐波那契數列的新值並產生了fibonaccistream,可是我們須要對最新的兩個值進行推斷。看它們是否超過了指定的閾值。超過了閾值的元組將會被輸出,而沒有超過的則會再次參與迭代。因此這將產生兩個不同的分支。我們也為此構建了分支流:
splitstream
> branchedstream =
fibonaccistream.
split
(new
fibonaccioverflowselector()
);
而對是否超過閾值的元組進行推斷並分離的實現例如以下:
private
static
class
fibonaccioverflowselector
implements
outputselector
<
tuple5
>
return collections.
singleton
(output_flag);}
}
在篩選方法select中,我們對不同的分支以不同的常量識別符號進行標識:iterate_flag(還要繼續迭代)和output_flag(直接輸出)。
產生了分支流之後。我們就能夠從中檢出不同的流分支做迭代或者輸出處理。
對須要再次迭代的,就通過迭代流的closewith方法反饋給迭代頭:
iterativestream.
closewith
(branchedstream.
select
(iterate_flag)
);
而對於不須要的迭代就直接讓其流向下游處理,這裡我們僅僅是簡單得將流「重構」了一下然後直接輸出:
datastream
> outputstream = branchedstream
.select
(output_flag)
.map
(new
buildoutputtuplemapfunction()
);outputstream.
print()
;
所謂的重構就是將之前的五元組又一次縮減為三元組,實現例如以下:
private
static
class
buildoutputtuplemapfunction
extends
richmapfunction
<
tuple5
,
tuple3
>
}
Flink 流處理之source簡介
1 從集合讀取資料 定義樣例類 水位感測器 用於接收空高資料 id 感測器編號 ts 時間戳 vc 空高 case class watersensor id string,ts long,vc double object source collection 2 從檔案讀取資料 val env str...
Flink 流處理WordCount 示例
然後開啟cmd視窗使用 使用命令 nc lp 8888即可開啟監聽 8888 埠號。如下圖 進行分組聚合 keyby 將key相同的分到乙個組中 singleoutputstreamoperator resultdatastream wordandone.keyby 0 sum 1 transfor...
flink學習(3) 流處理API
environment getexecutionenvironment 建立乙個執行環境,表示當前執行程式的上下文 批處理使用executionenvironment呼叫 流處理使用streamexecutionenvironment呼叫 執行環境的 變數 可以通過 setparalleism設定全...