在 spark 中,提供了兩種型別的共享變數:累加器 (accumulator) 與廣播變數 (broadcast variable):
這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的**進行計算,會發現執行結果並非預期:
var counter = 0
val data = array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
println(counter)
counter 最後的結果是 0,導致這個問題的主要原因是閉包。
1. scala 中閉包的概念
這裡先介紹一下 scala 中關於閉包的概念:
var more = 10
val addmore = (x: int) => x + more
如上函式addmore
中有兩個變數 x 和 more:
按照定義:在建立函式時,如果需要捕獲自由變數,那麼包含指向**獲變數的引用的函式就被稱為閉包函式。
2. spark 中的閉包
也可以參考:
在實際計算時,spark 會將對 rdd 操作分解為 task,task 執行在 worker node 上。在執行之前,spark 會對任務進行閉包,如果閉包內涉及到自由變數,則程式會進行拷貝,並將副本變數放在閉包中,之後閉包被序列化並傳送給每個執行者。因此,當在 foreach 函式中引用counter
時,它將不再是 driver 節點上的counter
,而是閉包中的副本counter
,預設情況下,副本counter
更新後的值不會回傳到 driver,所以counter
的最終值仍然為零。
需要注意的是:在 local 模式下,有可能執行foreach
的 worker node 與 diver 處在相同的 jvm,並引用相同的原始counter
,這時候更新可能是正確的,但是在集群模式下一定不正確。所以在遇到此類問題時應優先使用累加器。
累加器的原理實際上很簡單:就是將每個副本變數的最終值傳回 driver,由 driver 聚合後得到最終值,並更新原始變數。
sparkcontext
中定義了所有建立累加器的方法,需要注意的是:被中橫線劃掉的累加器方法在 spark 2.0.0 之後被標識為廢棄。
使用示例和執行結果分別如下:
val data = array(1, 2, 3, 4, 5)
// 定義累加器
val accum = sc.longaccumulator("my accumulator")
sc.parallelize(data).foreach(x => accum.add(x))
// 獲取累加器的值
accum.value
在上面介紹中閉包的過程中我們說道每個 task 任務的閉包都會持有自由變數的副本,如果變數很大且 task 任務很多的情況下,這必然會對網路 io 造成壓力,為了解決這個情況,spark 提供了廣播變數。
廣播變數的做法很簡單:就是不把副本變數分發到每個 task 中,而是將其分發到每個 executor,executor 中的所有 task 共享乙個副本變數。
// 把乙個陣列定義為乙個廣播變數
val broadcastvar = sc.broadcast(array(1, 2, 3, 4, 5))
// 之後用到該陣列時應優先使用廣播變數,而不是原值
sc.parallelize(broadcastvar.value).map(_ * 10).collect()
建立的accumulator變數的值能夠在spark web ui上看到,在建立時應該盡量為其命名,下面**如何在spark web ui上檢視累加器的值
大資料開發 Spark 共享變數之累加器和廣播變數
在 spark 中,提供了兩種型別的共享變數 累加器 accumulator 與廣播變數 broadcast variable 這裡先看乙個具體的場景,對於正常的累計求和,如果在集群模式中使用下面的 進行計算,會發現執行結果並非預期 var counter 0 val data array 1,2,...
Spark特性之共享變數
spark乙個非常重要的特性就是共享變數。預設情況下,如果在乙個運算元的函式中使用到了某個外部的變數,那麼這個變數的值會被拷貝到每個task中。此時每個task只能操作自己的那份變數副本。如果多個task想要共享某個變數,那麼這種方式是做不到的。spark為此提供了兩種共享變數,一種是broadca...
Spark共享變數
預設情況下,如果在乙個運算元的函式中使用到了某個外部的變數,那麼這個變數的值會被拷貝到每個task中。此時每個task只能操作自己的那份變數副本。如果多個task想要共享某個變數,那麼這種方式是做不到的。spark為此提供了兩種共享變數,一種是broadcast variable 廣播變數 另一種是...