spark的三大資料結構
rdd:分布式資料集
廣播變數:分布式唯讀共享變數
累加器:分布式只寫共享變數
1.累加器
預設累加器
例子:對乙個list中的所有值進行相加
首先上圖中紅色部分框出來的**,看上去邏輯沒有什麼大問題,但是輸出的結果sum=0。這是因為,sum在driver中被定義,在不同的executor中計算,每個executor得到值既不能彼此相加,也不能傳回driver輸出,所以導致driver中sum的值一直沒有變過。
這時候就可以採用累加器,因為driver和各個executor都需要使用這個資料,所以在這裡定義乙個只寫共享變數是合適的。累加器解決的問題就是資料原本不能從executor傳回driver的問題。
自定義累加器
例子:取出含有"u"的字串,累加
class wordaccumulator extends accumulatorv2[string,util.arraylist[string]]
//複製累加器物件
override def copy(): accumulatorv2[string, util.arraylist[string]] =
//重置累加器
override def reset(): unit =
//實現累加器的邏輯
override def add(v: string): unit =
} //合併累加器
override def merge(other: accumulatorv2[string, util.arraylist[string]]): unit =
//獲取累加器的結果
override def value: util.arraylist[string] = list
}
2.廣播變數//建立spark上下文物件
val sc = new sparkcontext(config)
val value = sc.makerdd(list("us","tomoon","oneus","you","loc"))
val accumulator = new wordaccumulator
//需要註冊一下
sc.register(accumulator)
value.foreach
}println(accumulator.value)
sc.stop()
使用的時候只要把原本的資料通過broadcast()轉化成廣播變數,使用的時候通過broadcast.value使用即可。
Spark累加器和廣播變數
累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...
spark的廣播變數和累加器
廣播變數 廣播變數允許開發人員在每個節點快取唯讀的變數,而不是在任務之間傳遞這些變數。例如,使用廣播變數能夠高效地 在集群每個節點建立大資料的副本。同時,spark還使用高效的廣播演算法分發這些變數,從而減少通訊的開銷。spark應用程式作業的執行由一系列排程階段構成,而這些排程階段通過shuffl...
Spark廣播變數與累加器
在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...