driver端
driver端初始化構建accumulator並初始化,同時完成了accumulator註冊,accumulators.register(this)時accumulator會在序列化後傳送到executor端
driver接收到resulttask完成的狀態更新後,會去更新value的值 然後在action操作執行後就可以獲取到accumulator的值了
executor端
executor端接收到task之後會進行反序列化操作,反序列化得到rdd和function。同時在反序列化的同時也去反序列化accumulator(在readobject方法中完成),同時也會向taskcontext完成註冊
完成任務計算之後,隨著task結果一起返回給driver
driver端初始化
driver端主要經過以下步驟,完成初始化操作:
val accum = sparkcontext.accumulator(0, 「accumulatortest」)
val acc = new accumulator(initialvalue, param, some(name))
accumulators.register(this)
executor端反序列化得到accumulator反序列化是在呼叫resulttask的runtask方式時候做的操作:
// 會反序列化出來rdd和自己定義的function
val (rdd, func) = ser.deserialize[(rdd[t], (taskcontext, iterator[t]) => u)](
bytebuffer.wrap(taskbinary.value), thread.currentthread.getcontextclassloader)
在反序列化的過程中,會呼叫accumulable中的readobject方法:
private def readobject(in: objectinputstream): unit = utils.tryorioexception
}
注意
@volatile @transient private var value_ : r = initialvalue // current value on master
針對傳入function中不同的操作,對應有不同的呼叫方法,以下列舉幾種(在accumulator.scala中):
def += (term: t)
def add(term: t)
def ++= (term: r)
根據不同的累加器引數,有不同實現的accumulableparam(在accumulator.scala中):
trait accumulableparam[r, t] extends serializable
不同的實現如下圖所示:
以intaccumulatorparam為例:
implicit object intaccumulatorparam extends accumulatorparam[int]
我們發現intaccumulatorparam實現的是trait accumulatorparam[t]:
trait accumulatorparam[t] extends accumulableparam[t, t]
}
在各個節點上的累加操作完成之後,就會緊跟著返回更新之後的accumulators的value_值
在task.scala中的run方法,會執行如下:
// 返回累加器,並執行task
// 呼叫taskcontextimpl的collectaccumulators,返回值的型別為乙個map
(runtask(context), context.collectaccumulators())
在executor端已經完成了一系列操作,需要將它們的值返回到driver端進行聚合彙總,整個順序如圖累加器執行流程:
根據執行流程,我們可以發現,在執行完collectaccumulators方法之後,最終會在dagscheduler中呼叫updateaccumulators(event),而在該方法中會呼叫accumulators的add方法,從而完成聚合操作:
def add(values: map[long, any]): unit = synchronized
} else
}}
通過accum.value方法可以獲取到累加器的值
至此,累加器執行完畢。
生產常用Spark累加器剖析之四
val acc sc.accumulator 0,error accumulator val data sc.parallelize 1 to 10 val newdata data.map x newdata.count acc.value newdata.foreach println acc....
Spark的累加器
val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...
Spark累加器 Accumulator 使用詳解
def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...