Flink實操 廣播變數 累加器 分布式快取

2021-10-20 20:14:42 字數 2904 閱讀 5876

三 .累加器

四 .分布式快取

flink支援廣播。可以將資料廣播到taskmanager上,資料儲存到記憶體中。

資料儲存在記憶體中,這樣可以減緩大量的shuffle操作;比如在資料join階段,不可避免的就是大量的shuffle操作,我們可以把其中乙個datastream廣播出去,一直載入到taskmanager的記憶體中,可以直接在記憶體中拿資料,避免了大量的shuffle,導致集群效能下降;

廣播變數建立後,它可以執行在集群中的任何function上,而不需要多次傳遞給集群節點。

另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。

// - 在`map`方法中使用廣播進行轉換

override

def map(value:

(int

,string

,int))

:(string

,string

,int)=

}).withbroadcastset(studentdataset,

"bc_student"

)// 3. 列印測試

resultdataset.print()}

}accumulator 即累加器,與 mapreduce counter 的應用場景差不多,都能很好地觀察task在執行期間的資料變化

可以在flink job任務中的運算元函式中操作累加器,但是只能在任務執行結束之後才能獲得累加器的最終結果。

flink現在有以下內建累加器。每個累加器都實現了accumulator介面。

遍歷下列資料, 列印出單詞的總數

"a","b","c","d"
開發步驟:獲取批處理環境

載入本地集合

map轉換

定義累加器

註冊累加器

累加資料

資料寫入到檔案中

執行任務,獲取任務執行結果物件(jobexecutionresult)

獲取累加器數值

列印數值

package com.boyi.broadcast

import org.apache.flink.api.common.accumulators.intcounter

import org.apache.flink.api.common.functions.richmapfunction

import org.apache.flink.api.scala.executionenvironment

import org.apache.flink.configuration.configuration

/** * counter 累加器

*/object batchdemocounter

var sum =0;

override

def map(value:

string)=

}).setparallelism(1)

res.writeastext(

"/opt/a/tmp/batchdemocounter"

)val jobresult = env.execute(

"batchdemocounterscala"

)// //3:獲取累加器

val num = jobresult.getaccumulatorresult[

int]

("num-lines"

) println(

"num:"

+num)

}}

flink提供了乙個類似於hadoop的分布式快取,讓並行執行例項的函式可以在本地訪問。

這個功能可以被使用來分享外部靜態的資料.

快取的使用流程:

使用executionenvironment例項對本地的或者遠端的檔案(例如:hdfs上的檔案),為快取檔案指定乙個名字註冊該快取檔案。當程式執行時候,flink會自動將複製檔案或者目錄到所有worker節點的本地檔案系統中,函式可以根據名字去該節點的本地檔案系統中檢索該檔案!

注意:廣播是將變數分發到各個worker節點的記憶體上,分布式快取是將檔案快取到各個worker節點上

遍歷下列資料, 並在open方法中獲取快取的檔案

a,b,c,d
import org.apache.commons.io.fileutils

import org.apache.flink.api.common.functions.richmapfunction

import org.apache.flink.api.scala.executionenvironment

import org.apache.flink.configuration.configuration

/** * 分布式快取

*/object batchdemodiscache

}override

def map(value:

string)=

})result.print()}

}

廣播變數與累加器

能不能將乙個rdd使用廣播變數廣播出去?不能 因為rdd是不存資料的。可以將rdd的結果廣播出去。廣播變數只能在driver端定義,不能在executor端定義。在driver端可以修改廣播變數的值,在executor端無法修改廣播變數的值。如果executor端用到了driver的變數,如果不使用...

關於廣播變數和累加器

廣播變數 groadcast varible 為唯讀變數,使用廣播變數的好處 每個節點的executor有乙個副本,不是每個task有乙個副本,可以優化資源提高效能,比如機器學習的時候。累加器 累加器可以在各個executor之間共享,修改,其中有幾種建立方法 objectaccumulatorte...

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...