Flink流處理之迭代案例

2021-09-09 05:19:31 字數 4241 閱讀 8600

我們在學習flink時一般都離不開flink官網,而我們通常都要先學會「example」,apche的開源專案一般都會有這個目錄,今天就說一下flink的example中的流處理的迭代

官網流處理的迭代位址

首先。基於輸入流構建iterativestream。這是乙個迭代的起始。通常稱之為迭代頭:

iterativestream

iteration = inputstream.

iterate()

;

接著。我們指定一系列的轉換操作用於表述在迭代過程中執行的邏輯(這裡簡單以map轉換作為演示樣例)。map api所接受的udf就是我們上文所說的步函式:

datastream

iteratedstream = iteration.

map(

/* this is executed many times */

);

然後。作為迭代我們肯定須要有資料反饋給迭代頭進行反覆計算,所以我們從迭代過的流中過濾出符合條件的元素組成的部分流,我們稱之為反饋流:

datastream

feedbackstream = iteratedstream.

filter

(/* one part of the stream */

);

將反饋流反饋給迭代頭就意味著乙個迭代的完整邏輯的完畢,那麼它就能夠「關閉」這個閉合的「環」了。通過呼叫iterativestream的closewith這一例項方法能夠關閉乙個迭代(也可表述為定義了迭代尾)。傳遞給closewith的資料流將會反饋給迭代頭:

iteration.

closewith

(feedbackstream)

;

另外,乙個慣用的模式是過濾出須要繼續向前分發的部分流,這個過濾轉換事實上定義的是「終止迭代」的邏輯條件,符合條件的元素將被分發給下游而不用於進行下一次迭代:

datastream

output = iteratedstream.

filter

(/* some other part of the stream */

);

主幹程式**例如以下:

public

static

void

main

(string[

] args)

throws exception

首先,我們先通過source函式建立初始的流物件inputstream:

datastream

> inputstream = env.

addsource

(new

randomfibonaccisource()

);

該source函式會生成二元組序列,二元組的兩個字段值是隨機生成的作為斐波那契數列的初始值:

private

static

class

randomfibonaccisource

implements

sourcefunction

>

}public

void

cancel()

}

為了對新計算的斐波那契數列中的值以及累加的迭代次數進行儲存,我們須要將二元組資料流轉換為五元組資料流,並據此建立迭代物件:

iterativestream

> iterativestream =

inputstream.

map(

newtupletransformmapfunction()

).iterate

(5000

);

元組轉換的map函式實現:

private

static

class

tupletransformmapfunction

extends

richmapfunction

integer>

, tuple5

>

}

上面五元組中,當中索引為0。1這兩個位置的元素,始終都是最初生成的兩個元素不會變化,而後三個欄位都會隨著迭代而變化。

在迭代流iterativestream建立完畢之後,我們將基於它執行斐波那契數列的步函式並產生斐波那契數列流fibonaccistream:

datastream

> fibonaccistream =

iterativestream.

map(

newfibonaccicalcstepfunction()

);

這裡的fibonaccistream僅僅是乙個代稱,當中的資料並非真正的斐波那契數列,事實上就是上面那個五元組。

當中用於計算斐波那契數列的步函式實現例如以下:

private

static

class

fibonaccicalcstepfunction

extends

richmapfunction

,

tuple5

>

}

每乙個元素計算斐波那契數列的新值並產生了fibonaccistream,可是我們須要對最新的兩個值進行推斷。看它們是否超過了指定的閾值。超過了閾值的元組將會被輸出,而沒有超過的則會再次參與迭代。因此這將產生兩個不同的分支。我們也為此構建了分支流:

splitstream

> branchedstream =

fibonaccistream.

split

(new

fibonaccioverflowselector()

);

而對是否超過閾值的元組進行推斷並分離的實現例如以下:

private

static

class

fibonaccioverflowselector

implements

outputselector

<

tuple5

>

return collections.

singleton

(output_flag);}

}

在篩選方法select中,我們對不同的分支以不同的常量識別符號進行標識:iterate_flag(還要繼續迭代)和output_flag(直接輸出)。

產生了分支流之後。我們就能夠從中檢出不同的流分支做迭代或者輸出處理。

對須要再次迭代的,就通過迭代流的closewith方法反饋給迭代頭:

iterativestream.

closewith

(branchedstream.

select

(iterate_flag)

);

而對於不須要的迭代就直接讓其流向下游處理,這裡我們僅僅是簡單得將流「重構」了一下然後直接輸出:

datastream

> outputstream = branchedstream

.select

(output_flag)

.map

(new

buildoutputtuplemapfunction()

);outputstream.

print()

;

所謂的重構就是將之前的五元組又一次縮減為三元組,實現例如以下:

private

static

class

buildoutputtuplemapfunction

extends

richmapfunction

<

tuple5

,

tuple3

>

}

Flink 流處理之source簡介

1 從集合讀取資料 定義樣例類 水位感測器 用於接收空高資料 id 感測器編號 ts 時間戳 vc 空高 case class watersensor id string,ts long,vc double object source collection 2 從檔案讀取資料 val env str...

Flink 流處理WordCount 示例

然後開啟cmd視窗使用 使用命令 nc lp 8888即可開啟監聽 8888 埠號。如下圖 進行分組聚合 keyby 將key相同的分到乙個組中 singleoutputstreamoperator resultdatastream wordandone.keyby 0 sum 1 transfor...

flink學習(3) 流處理API

environment getexecutionenvironment 建立乙個執行環境,表示當前執行程式的上下文 批處理使用executionenvironment呼叫 流處理使用streamexecutionenvironment呼叫 執行環境的 變數 可以通過 setparalleism設定全...