Spark累加器和廣播變數

2021-10-13 12:50:30 字數 1349 閱讀 7610

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和、累加合併元素等。

假設我們有乙個word.txt文字,我們想要統計該文字中單詞「sheep」的行數,我們可以直接讀取文字filter過濾然後計數。

sc.textfile("word.txt").filter(_.contains("sheep")).count()
假設我們想分別統計文字中單詞"sheep""wolf"的行數,如果按照上述方法需要計算兩次

sc.textfile("word.txt").filter(_.contains("sheep")).count()

sc.textfile("word.txt").filter(_.contains("wolf")).count()

如果要分別統計100個單詞的行數,則要計算100次

如果使用累加器,則只需要讀一次即可

val count1=sc.acccumlator(0)

val count2=sc.acccumlator(0)

...def processline(line:string):unit

if(line.contains("wolf"))

...}sc.textfile("word.txt").foreach(processline(_))

不僅int型別可以累加,long、double、collection也可以累加,還可以進行自定義,而且這個變數可以在spark的webui介面看到。

注意:累加器只能在driver端定義和讀取,不能在executor端讀取

廣播變數允許快取乙個唯讀的變數在每台機器(worker)上面,而不是每個任務(task)儲存乙份備份。利用廣播變數能夠以一種更有效率的方式將乙個大資料量輸入集合的副本分配給每個節點。

廣播變數通過兩個方面提高資料共享效率:

(1)集群中每個節點(物理機器)只有乙個副本,預設的閉包是每個任務乙個副本;

val list=sc.parallize(0 to 10)

val brdlist=sc.broadcast(list)

sc.textfile("test.txt").filter(brdlist.value.contains(_.toint)).foreach(println)

使用時,需注意:

(1)適用於小變數分發,對於動則幾十m的變數,每個任務都傳送一次既消耗記憶體,也浪費時間

(2)廣播變數只能在driver端定義,在executor端讀取,executor不能修改

spark的廣播變數和累加器

廣播變數 廣播變數允許開發人員在每個節點快取唯讀的變數,而不是在任務之間傳遞這些變數。例如,使用廣播變數能夠高效地 在集群每個節點建立大資料的副本。同時,spark還使用高效的廣播演算法分發這些變數,從而減少通訊的開銷。spark應用程式作業的執行由一系列排程階段構成,而這些排程階段通過shuffl...

spark之累加器和廣播變數

spark的三大資料結構 rdd 分布式資料集 廣播變數 分布式唯讀共享變數 累加器 分布式只寫共享變數 1.累加器 預設累加器 例子 對乙個list中的所有值進行相加 首先上圖中紅色部分框出來的 看上去邏輯沒有什麼大問題,但是輸出的結果sum 0。這是因為,sum在driver中被定義,在不同的e...

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...