def accumulator[t](initialvalue: t,name: string)(implicit param: org.apache.spark.accumulatorparam[t]): org.apache.spark.accumulator[t]
第乙個引數應是數值型別,是累加器的初始值,第二個引數是該累加器的命字,這樣就會在spark web ui中顯示,可以幫助你了解程式執行的情況。
下面看行累加器具體的例子:
val accum = sc.longaccumulator("longaccum") //統計奇數的個數
val sum = sc.parallelize(array(1,2,3,4,5,6,7,8,9),2).filter(n=>).reduce(_+_)
println("sum: "+sum)
println("accum: "+accum.value) 12
3456
78結果為:
sum: 20
accum: 5
這是結果正常的情況,但是在使用累加器的過程中如果對於spark的執行過程和運算模型理解的不夠深入就會遇到意想不到的錯誤。
下面看錯誤的情況:
val accum= sc.accumulator(0, "error accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1
val newdata = data.mapelse 1
}}//使用action操作觸發執行
newdata.count
//此時accum的值為5,是我們要的結果
accum.value
//繼續操作,檢視剛才變動的資料,foreach也是action操作
newdata.foreach(println)
//上個步驟沒有進行累計器操作,可是累加器此時的結果已經是10了
//這並不是我們想要的結果
accum.value12
3456
78910
1112
1314
1516
1718
19spark中的一系列transform操作會構成dag,此時需要通過乙個action操作來觸發,accumulator也是一樣。因此在乙個action操作之前,你呼叫value方法檢視其數值,肯定是沒有任何變化的。
所以在第一次count(action操作)之後,我們發現累加器的數值變成了5,是我們要的答案。
之後又對新產生的的newdata進行了一次foreach(action操作),其實這個時候又執行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結果變成了10。
這種問題如何解決呢?看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的準確性。事實上,還是有解決方案的,只要將任務之間的依賴關係切斷就可以了。什麼方法有這種功能呢?你們肯定都想到了,cache,persist。呼叫這個方法的時候會將之前的依賴切除,後續的累加器就不會再被之前的transfrom操作影響到了。
//val accum= sc.accumulator(0, "error accumulator")
val data = sc.parallelize(1 to 10)
//**和上方相同
val newdata = data.map}
//使用cache快取資料,切斷依賴。
newdata.cache.count
//此時accum的值為5
accum.value
newdata.foreach(println)
//此時的accum依舊是5
accum.value12
3456
78910
1112
1314
總之,使用accumulator時,為了保證準確性,只使用一次action操作。
Spark的累加器
val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...
Spark廣播變數與累加器
在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...
Spark累加器和廣播變數
累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...