10 累加器和廣播變數

2021-10-02 03:02:34 字數 912 閱讀 5047

在建立函式時,如果需要捕獲自由變數,那麼包含指向**獲變數的引用的函式就被稱為閉包函式。在實際計算時,spark 會將對 rdd 操作分解為 task,task 執行在 worker node 上。在執行之前,spark 會對任務進行閉包,如果閉包內涉及到自由變數,則程式會進行拷貝,並將副本變數放在閉包中,之後閉包被序列化並傳送給每個執行者。因此,當在 foreach 函式中引用 counter 時,它將不再是 driver 節點上的 counter,而是閉包中的副本 counter,預設情況下,副本 counter 更新後的值不會回傳到 driver,所以 counter 的最終值仍然為零。

自定義累加器new乙個之後,方法add之中就行了。

自定義的累加器需要繼承accumulatorv2並實現以下方法

class mapacc extends accumulatorv2[long,map[string,double]]

override def reset(): unit = map = map[string, double]()

override def add(v: long): unit =

override def merge(other: accumulatorv2[long, map[string, double]]): unit = other match

override def value: map[string, double] =

}

呼叫時new乙個分割槽器,然後使用add方法進行遞增

注意:累加器最好用在行動運算元之中

如果說累加器是共寫變數,那麼廣播變數就是共讀變數。廣播變數通過呼叫sparkcontext.broadcast(v)來建立.實際是對v的乙個包裝,廣播後在所有的節點上都可以通過.value獲得該值。

關於廣播變數和累加器

廣播變數 groadcast varible 為唯讀變數,使用廣播變數的好處 每個節點的executor有乙個副本,不是每個task有乙個副本,可以優化資源提高效能,比如機器學習的時候。累加器 累加器可以在各個executor之間共享,修改,其中有幾種建立方法 objectaccumulatorte...

spark RDD累加器和廣播變數

在預設情況下,當spark在集群的多個不同節點的多個任務上並行執行乙個函式時,它會把函式中涉及到的每個變數,在每個任務 上都生成乙個副本。但是,有時候需要在多個任務之間共享變數,或者在任務 task 和任務控制節點 driver program 之間共享變數。為了滿足這種需求,spark提供了兩種型...

Spark累加器和廣播變數

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...