Spark累加器 Accumulator 使用詳解

2021-09-16 12:06:32 字數 1991 閱讀 8171

def accumulator[t](initialvalue: t,name: string)(implicit param: org.apache.spark.accumulatorparam[t]): org.apache.spark.accumulator[t] 

第乙個引數應是數值型別,是累加器的初始值,第二個引數是該累加器的命字,這樣就會在spark web ui中顯示,可以幫助你了解程式執行的情況。 

下面看行累加器具體的例子: 

val accum = sc.longaccumulator("longaccum") //統計奇數的個數  

val sum = sc.parallelize(array(1,2,3,4,5,6,7,8,9),2).filter(n=>).reduce(_+_)  

println("sum: "+sum)  

println("accum: "+accum.value)  12

3456

78結果為: 

sum: 20 

accum: 5

這是結果正常的情況,但是在使用累加器的過程中如果對於spark的執行過程和運算模型理解的不夠深入就會遇到意想不到的錯誤。

下面看錯誤的情況: 

val accum= sc.accumulator(0, "error accumulator")

val data = sc.parallelize(1 to 10)

//用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1

val newdata = data.mapelse 1

}}//使用action操作觸發執行

newdata.count

//此時accum的值為5,是我們要的結果

accum.value

//繼續操作,檢視剛才變動的資料,foreach也是action操作

newdata.foreach(println)

//上個步驟沒有進行累計器操作,可是累加器此時的結果已經是10了

//這並不是我們想要的結果

accum.value12

3456

78910

1112

1314

1516

1718

19spark中的一系列transform操作會構成dag,此時需要通過乙個action操作來觸發,accumulator也是一樣。因此在乙個action操作之前,你呼叫value方法檢視其數值,肯定是沒有任何變化的。

所以在第一次count(action操作)之後,我們發現累加器的數值變成了5,是我們要的答案。

之後又對新產生的的newdata進行了一次foreach(action操作),其實這個時候又執行了一次map(transform)操作,所以累加器又增加了5。最終獲得的結果變成了10。

這種問題如何解決呢?看了上面的分析,大家都有這種印象了,那就是使用累加器的過程中只能使用一次action的操作才能保證結果的準確性。事實上,還是有解決方案的,只要將任務之間的依賴關係切斷就可以了。什麼方法有這種功能呢?你們肯定都想到了,cache,persist。呼叫這個方法的時候會將之前的依賴切除,後續的累加器就不會再被之前的transfrom操作影響到了。

//val accum= sc.accumulator(0, "error accumulator")

val data = sc.parallelize(1 to 10)

//**和上方相同

val newdata = data.map}

//使用cache快取資料,切斷依賴。

newdata.cache.count

//此時accum的值為5

accum.value

newdata.foreach(println)

//此時的accum依舊是5

accum.value12

3456

78910

1112

1314

總之,使用accumulator時,為了保證準確性,只使用一次action操作。

Spark的累加器

val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...

Spark廣播變數與累加器

在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...

Spark累加器和廣播變數

累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...