生產常用Spark累加器剖析之二

2021-09-27 05:18:31 字數 2644 閱讀 2643

driver端

driver端初始化構建accumulator並初始化,同時完成了accumulator註冊,accumulators.register(this)時accumulator會在序列化後傳送到executor端

driver接收到resulttask完成的狀態更新後,會去更新value的值 然後在action操作執行後就可以獲取到accumulator的值了

executor端

executor端接收到task之後會進行反序列化操作,反序列化得到rdd和function。同時在反序列化的同時也去反序列化accumulator(在readobject方法中完成),同時也會向taskcontext完成註冊

完成任務計算之後,隨著task結果一起返回給driver

driver端初始化

driver端主要經過以下步驟,完成初始化操作:

val accum = sparkcontext.accumulator(0, 「accumulatortest」)

val acc = new accumulator(initialvalue, param, some(name))

accumulators.register(this)

executor端反序列化得到accumulator反序列化是在呼叫resulttask的runtask方式時候做的操作:

// 會反序列化出來rdd和自己定義的function

val (rdd, func) = ser.deserialize[(rdd[t], (taskcontext, iterator[t]) => u)](

bytebuffer.wrap(taskbinary.value), thread.currentthread.getcontextclassloader)

在反序列化的過程中,會呼叫accumulable中的readobject方法:

private def readobject(in: objectinputstream): unit = utils.tryorioexception 

}

注意

@volatile @transient private var value_ : r = initialvalue // current value on master
針對傳入function中不同的操作,對應有不同的呼叫方法,以下列舉幾種(在accumulator.scala中):

def += (term: t) 

def add(term: t)

def ++= (term: r)

根據不同的累加器引數,有不同實現的accumulableparam(在accumulator.scala中):

trait accumulableparam[r, t] extends serializable
不同的實現如下圖所示:

以intaccumulatorparam為例:

implicit object intaccumulatorparam extends accumulatorparam[int]
我們發現intaccumulatorparam實現的是trait accumulatorparam[t]:

trait accumulatorparam[t] extends accumulableparam[t, t] 

}

在各個節點上的累加操作完成之後,就會緊跟著返回更新之後的accumulators的value_值

在task.scala中的run方法,會執行如下:

// 返回累加器,並執行task

// 呼叫taskcontextimpl的collectaccumulators,返回值的型別為乙個map

(runtask(context), context.collectaccumulators())

在executor端已經完成了一系列操作,需要將它們的值返回到driver端進行聚合彙總,整個順序如圖累加器執行流程:

根據執行流程,我們可以發現,在執行完collectaccumulators方法之後,最終會在dagscheduler中呼叫updateaccumulators(event),而在該方法中會呼叫accumulators的add方法,從而完成聚合操作:

def add(values: map[long, any]): unit = synchronized 

} else

}}

通過accum.value方法可以獲取到累加器的值

至此,累加器執行完畢。

生產常用Spark累加器剖析之四

val acc sc.accumulator 0,error accumulator val data sc.parallelize 1 to 10 val newdata data.map x newdata.count acc.value newdata.foreach println acc....

Spark的累加器

val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...

Spark累加器 Accumulator 使用詳解

def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...