生產常用Spark累加器剖析之四

2021-09-27 05:18:31 字數 550 閱讀 2086

val acc = sc.accumulator(0, 「error accumulator」)

val data = sc.parallelize(1 to 10)

val newdata = data.map(x =>

})newdata.count

acc.value

newdata.foreach(println)

acc.value

上述現象,會造成acc.value的最終值變為10

spark中的一系列transform操作都會構造成一長串的任務鏈,此時就需要通過乙個action操作來觸發(lazy的特性),accumulator也是如此。

原因就在於第二次action操作的時候,又執行了一次累加器的操作,同個累加器,在原有的基礎上又加了5,從而變成了10

通過上述的現象描述,我們可以很快知道解決的方法:只進行一次action操作。基於此,我們只要切斷任務之間的依賴關係就可以了,即使用cache、persist。這樣操作之後,那麼後續的累加器操作就不會受前面的transform操作影響了

生產常用Spark累加器剖析之二

driver端 driver端初始化構建accumulator並初始化,同時完成了accumulator註冊,accumulators.register this 時accumulator會在序列化後傳送到executor端 driver接收到resulttask完成的狀態更新後,會去更新value...

Spark的累加器

val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...

Spark累加器 Accumulator 使用詳解

def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...