val acc = sc.accumulator(0, 「error accumulator」)
val data = sc.parallelize(1 to 10)
val newdata = data.map(x =>
})newdata.count
acc.value
newdata.foreach(println)
acc.value
上述現象,會造成acc.value的最終值變為10
spark中的一系列transform操作都會構造成一長串的任務鏈,此時就需要通過乙個action操作來觸發(lazy的特性),accumulator也是如此。
原因就在於第二次action操作的時候,又執行了一次累加器的操作,同個累加器,在原有的基礎上又加了5,從而變成了10
通過上述的現象描述,我們可以很快知道解決的方法:只進行一次action操作。基於此,我們只要切斷任務之間的依賴關係就可以了,即使用cache、persist。這樣操作之後,那麼後續的累加器操作就不會受前面的transform操作影響了
生產常用Spark累加器剖析之二
driver端 driver端初始化構建accumulator並初始化,同時完成了accumulator註冊,accumulators.register this 時accumulator會在序列化後傳送到executor端 driver接收到resulttask完成的狀態更新後,會去更新value...
Spark的累加器
val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...
Spark累加器 Accumulator 使用詳解
def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...