三 .累加器
四 .分布式快取
flink支援廣播。可以將資料廣播到taskmanager上,資料儲存到記憶體中。
資料儲存在記憶體中,這樣可以減緩大量的shuffle操作;比如在資料join階段,不可避免的就是大量的shuffle操作,我們可以把其中乙個datastream廣播出去,一直載入到taskmanager的記憶體中,可以直接在記憶體中拿資料,避免了大量的shuffle,導致集群效能下降;
廣播變數建立後,它可以執行在集群中的任何function上,而不需要多次傳遞給集群節點。
另外需要記住,不應該修改廣播變數,這樣才能確保每個節點獲取到的值都是一致的。
// - 在`map`方法中使用廣播進行轉換
override
def map(value:
(int
,string
,int))
:(string
,string
,int)=
}).withbroadcastset(studentdataset,
"bc_student"
)// 3. 列印測試
resultdataset.print()}
}accumulator 即累加器,與 mapreduce counter 的應用場景差不多,都能很好地觀察task在執行期間的資料變化
可以在flink job任務中的運算元函式中操作累加器,但是只能在任務執行結束之後才能獲得累加器的最終結果。
flink現在有以下內建累加器。每個累加器都實現了accumulator介面。
遍歷下列資料, 列印出單詞的總數
"a","b","c","d"
開發步驟:獲取批處理環境
載入本地集合
map轉換
定義累加器
註冊累加器
累加資料
資料寫入到檔案中
執行任務,獲取任務執行結果物件(jobexecutionresult)
獲取累加器數值
列印數值
package com.boyi.broadcast
import org.apache.flink.api.common.accumulators.intcounter
import org.apache.flink.api.common.functions.richmapfunction
import org.apache.flink.api.scala.executionenvironment
import org.apache.flink.configuration.configuration
/** * counter 累加器
*/object batchdemocounter
var sum =0;
override
def map(value:
string)=
}).setparallelism(1)
res.writeastext(
"/opt/a/tmp/batchdemocounter"
)val jobresult = env.execute(
"batchdemocounterscala"
)// //3:獲取累加器
val num = jobresult.getaccumulatorresult[
int]
("num-lines"
) println(
"num:"
+num)
}}
flink提供了乙個類似於hadoop的分布式快取,讓並行執行例項的函式可以在本地訪問。
這個功能可以被使用來分享外部靜態的資料.
快取的使用流程:
使用executionenvironment例項對本地的或者遠端的檔案(例如:hdfs上的檔案),為快取檔案指定乙個名字註冊該快取檔案。當程式執行時候,flink會自動將複製檔案或者目錄到所有worker節點的本地檔案系統中,函式可以根據名字去該節點的本地檔案系統中檢索該檔案!
注意:廣播是將變數分發到各個worker節點的記憶體上,分布式快取是將檔案快取到各個worker節點上遍歷下列資料, 並在open方法中獲取快取的檔案
a,b,c,d
import org.apache.commons.io.fileutils
import org.apache.flink.api.common.functions.richmapfunction
import org.apache.flink.api.scala.executionenvironment
import org.apache.flink.configuration.configuration
/** * 分布式快取
*/object batchdemodiscache
}override
def map(value:
string)=
})result.print()}
}
廣播變數與累加器
能不能將乙個rdd使用廣播變數廣播出去?不能 因為rdd是不存資料的。可以將rdd的結果廣播出去。廣播變數只能在driver端定義,不能在executor端定義。在driver端可以修改廣播變數的值,在executor端無法修改廣播變數的值。如果executor端用到了driver的變數,如果不使用...
關於廣播變數和累加器
廣播變數 groadcast varible 為唯讀變數,使用廣播變數的好處 每個節點的executor有乙個副本,不是每個task有乙個副本,可以優化資源提高效能,比如機器學習的時候。累加器 累加器可以在各個executor之間共享,修改,其中有幾種建立方法 objectaccumulatorte...
Spark廣播變數與累加器
在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...