六 DataStream API 之 執行環境

2022-10-11 18:39:09 字數 3621 閱讀 4593

flink 程式可以在各種上下文環境中執行:可以在本地 jvm 中執行程式,也可以提交到遠端集群上執行。

不同的環境,**的提交執行的過程會有所不同。這就要求在提交作業執行計算時, 首先必須獲取當前 flink 的執行環境,從而建立起與 flink 框架之間的聯絡。只有獲取了環境上下文資訊,才能將具體的任務排程到不同的taskmanager 執行。

編 寫 flink 程 序 的 第 一 步,就 是 創 建 執 行 環 境 。 要 獲 取 的 執 行 環 境 ,是streamexecutionenvironment類的物件,這是所有 flink 程式的基礎。

在**中建立執行環境的 方式,就是呼叫這個類的靜態方法,具體有以下三種。

getexecutionenvironment

最簡單的方式,就是直接呼叫getexecutionenvironment方法。它會根據當前執行的上下文直接得到正確的結果:

也就是說,這個方法會根據當前執行的方式,自行決定該返回什麼樣的執行環境

streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();
這種「智慧型」的方式不需要我們額外做判斷,用起來簡單高效,是最常用的一種建立執行環境的方式。

createlocalenvironment

這個方法返回乙個本地執行環境。可以在呼叫時傳入乙個引數,指定預設的並行度;如果不傳入,則預設並行度就是本地的cpu核心數。

streamexecutionenvironment localenv = streamexecutionenvironment.createlocalenvironment();

createremoteenvironment

這個方法返回集群執行環境。需要在呼叫時指定jobmanager的主機名和埠號,並指定要在集群中執行的jar包。

streamexecutionenvironment remoteenv = streamexecutionenvironment

.createremoteenvironment(

"host", // jobmanager 主機名

1234, // jobmanager 程序埠號

"path/to/jarfile.jar" // 提交給 jobmanager 的 jar 包

);

在獲取到程式執行環境後,還可以對執行環境進行靈活的設定。比如可以全域性設定程式的並行度、禁用運算元鏈,還可以定義程式的時間語義、配置容錯機制。

wordcount程式獲取到的執行環境,是乙個streamexecutionenvironment,顧名思義它應該是做流處理的。那對於批處理,又應該怎麼獲取執行環境呢?

在之前的flink版本中,批處理的執行環境與流處理類似,是呼叫類executionenvironment的靜態方法,返回它的物件:

// 批處理環境

executionenvironment batchenv = executionenvironment.getexecutionenvironment();

// 流處理環境

streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

基於executionenvironment讀入資料建立的資料集合,就是dataset;對應的呼叫的一整套轉換方法,就是datasetapi。這些在批處理wordcount程式中已經有了基本了解。

而從1.12.0版本起,flink實現了api上的流批統一。datastreamapi新增了乙個重要特性:可以支援不同的「執行模式」(executionmode),通過簡單的設定就可以讓一段flink程式在流處理和批處理之間切換。這樣一來,datasetapi也就沒有存在的必要了。

batch 模式的配置方法

由於 flink 程式預設是 streaming 模式,這裡重點介紹一下 batch 模式的配置。

主要有兩種方式:

(1)通過命令列配置

在提交作業時,增加 execution.runtime-mode 引數,指定值為 batch。

bin/flink run -dexecution.runtime-mode=batch ...
(2)通過**配置(不推薦-靈活性差)

在**中,直接基於執行環境呼叫 setruntimemode 方法,傳入 batch 模式。

建議: 不要在**中配置,而是使用命令列。這同設定並行度是類似的:在提交作業時指定引數可以更加靈活,同一段應用程式寫好之後,既可以用於批處理也可以用於流處理。而在**中硬編碼(hard code)的方式可擴充套件性比較差,一般都不推薦

streamexecutionenvironment env = streamexecutionenvironment.getexecutionenvironment();

env.setruntimemode(runtimeexecutionmode.batch);

什麼時候選擇 batch 模式

flink 本身持有的就是流處理的世界觀,即使是批量資料,也可以看作「有界流」來進行處理。所以 streaming 執行模式對於有界資料和無界資料都是有效的;而 batch模式僅能用於有界資料。看起來 batch 模式似乎被 streaming 模式全覆蓋了,那還有必要存在嗎?能不能所有情況下都用流處理模式呢?

當然是可以的,但是這樣有時不夠高效。可以仔細回憶一下 word count 程式中,批處理和流處理輸出的不同:

最終的結果兩者是一致的,但是流處理模式會將更多的中間結果輸出。在本來輸入有界、只希望通過批處理得到最終的結果的場景下,

streaming 模式的逐個輸出結果就沒有必要了。

所以總結起來,乙個簡單的原則就是:

用 batch 模式處理批量資料,用 streaming模式處理流式資料。因為資料有界的時候,直接輸出結果會更加高效;而當資料無界的時候, 沒得選擇——只有 streaming 模式才能處理持續的資料流。

在後面的**中,即使是有界的資料來源,也會統一用 streaming 模式處理。這是因為主要目標還是構建實時處理流資料的程式,有界資料來源也只是用來測試的手段。

有了執行環境,就可以構建程式的處理流程了:基於環境讀取資料源,進而進行各種轉換操作,最後輸出結果到外部系統。

需要注意的是,寫完輸出(sink)操作並不代表程式已經結束。因為當 main()方法被呼叫時,其實只是定義了作業的每個執行操作,然後新增到資料流圖中;這時並沒有真正處理資料,因為資料可能還沒來。flink 是由事件驅動的,只有等到資料到來,才會觸發真正的計算,這也被稱為「延遲執行」或「懶執行」(lazy execution)。

所以需要顯式地呼叫執行環境的 execute()方法,來觸發程式執行。execute()方法將一直等待作業完成,然後返回乙個執行結果(jobexecutionresult)。

env.execute();

ruby 筆記之六

1.public protected private class person private def talk puts nihao endend p1 person.new p1.talk class person public talk endp1.talk 2.第七章開始 puts math...

非常道之六

前不久,南寧市一名年近70歲的退休老人在超市偷了兩本筆記本,事情敗露後,老人老老實實地承認了錯誤。派出所民警說,老人還供認半個月前在這家超市一次性地偷了10多本類似的筆記本。民警隨後來到老人家裡,看到老人上一次偷的10多本筆記本都還沒有用過。民警了解到,老人每個月的退休金不少,家庭經濟條件也不錯,便...

六 Python之迭代

通過for迴圈來遍歷python的集合,我們稱之為迭代,毫無疑問python的迭代具有更高的抽象度,迭代與按下標訪問陣列最大的不同是,後者是一種具體的迭代實現方式,而前者只關心迭代結果,根本不關心迭代內部是如何實現的。有的時候,我們確實想在 for 迴圈中拿到索引,怎麼辦?方法是使用 enumera...