Spark應用程式執行 階段的劃分

2022-06-16 11:24:08 字數 3334 閱讀 8857

​ sparkcontext物件包含有乙個私有屬性dagscheduler階段排程器,主要用於階段的劃分。在乙個應用程式中,任務的提交都是從行動運算元觸發的。行動運算元的方法內部會呼叫乙個runjob方法,其中就有dag排程器發揮執行job的作用:

dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
​ runjob方法中,會執行submitjob方法:

val waiter = submitjob(rdd, func, partitions, callsite, resulthandler, properties)
val waiter = new jobwaiter[u](this, jobid, partitions.size, resulthandler)

eventprocessloop.post(jobsubmitted(

jobid, rdd, func2, partitions.toarray, callsite, waiter,

utils.cloneproperties(properties)))

​ 此處有乙個jobsubmitted事件,這個事件作為post方法的引數,該post方法主要用於將事件放入到乙個佇列中,然後等待事件執行緒執行佇列中的事件:

def post(event: e): unit =  else 

}}

​ 檢視這個事件執行緒eventthread,當這個事件執行緒執行的時候,會執行run方法,在方法的內部會取出事件佇列中的事件。

private[spark] val eventthread = new thread(name)  catch  catch }}

} catch

}}

override def onreceive(event: dagschedulerevent): unit =  finally 

}

​ 這是階段排程器的主要事件迴圈。該方法又將事件傳給了doonreceive方法,

private def doonreceive(event: dagschedulerevent): unit = event match
​ 該方法中包含模式匹配,jobsubmitted事件正好可以匹配到第一項,說白了就是dagscheduler類會向事件佇列傳送乙個訊息,訊息中包含的是事件,然後事件執行緒收到訊息之後會對訊息進行匹配。此處會執行handlejobsubmitted方法,檢視其原始碼,其中

private[scheduler] def handlejobsubmitted(jobid: int,

finalrdd: rdd[_],

func: (taskcontext, iterator[_]) => _,

partitions: array[int],

callsite: callsite,

listener: joblistener,

properties: properties): unit = catch

​ resultstage中包含的rdd就是執行行動運算元的那個rdd(下圖中黃色表示的那個),也就是最後的那個rdd(下圖中黃色圖表示的rdd)。parents是resultstage的上一級階段,parents是getorcreateparentstages方法的返回值。getorcreateparentstages用於獲取或者建立給定rdd的父階段列表。

private def getorcreateparentstages(rdd: rdd[_], firstjobid: int): list[stage] = .tolist

}

​ getshuffledependencies方法用於獲取給定rdd的shuffle依賴,其核心**如下:

private[scheduler] def getshuffledependencies(

rdd: rdd[_]): hashset[shuffledependency[_, _, _]] =

}} parents

}

​ 核心**用於判斷給定rdd的依賴關係是不是shuffle依賴,如果是則加入結果列表。最終返回的結果列表,會通過map方法將列表中的每乙個元素執行getorcreateshufflemapstage方法,該方法用於獲取或者建立shufflemap階段(寫磁碟之前的階段)。

​ getorcreateshufflemapstage(shuffledep, firstjobid)=>createshufflemapstage(shuffledep, firstjobid)

​ createshufflemapstage方法中會建立shufflemapstage物件,並當前rdd(呼叫行動運算元的那個)依賴的rdd(下圖紫色那個rdd)傳給這個物件。

def createshufflemapstage[k, v, c](

shuffledep: shuffledependency[k, v, c], jobid: int): shufflemapstage = {

val rdd = shuffledep.rdd

…… val numtasks = rdd.partitions.length

val parents = getorcreateparentstages(rdd, jobid)

val id = nextstageid.getandincrement()

val stage = new shufflemapstage(

id, rdd, numtasks, parents, jobid, rdd.creationsite, shuffledep, mapoutputtracker)

……

​ 此時,shufflemap階段(下圖紅色區域)就是result階段(藍色區域)的上一級階段。在上面的**中,我們還可以看到,如果當前shufflemap階段還有上一級階段,那麼getorcreateparentstages(rdd, jobid)方法還是會獲取它的上一級階段的,此時這個方法中的rdd就不再是最後乙個rdd,而是最後乙個rdd的前乙個rdd,也就是紫色表示的那個rdd。也就是說,階段的尋找是乙個不斷往前的過程,只要含有shuffle過程,那麼就會有新的階段。

spark應用程式的執行架構

spark應用程式的執行架構 幾個基本概念 1 job 包含多個task組成的平行計算,往往由action催生。2 stage job的排程單位。3 task 被送到某個executor上的工作單元。4 taskset 一組關聯的,相互之間沒有shuffle依賴關係的任務組成的任務集。乙個應用程式由...

Spark應用程式的執行架構

1 簡單的說 由driver向集群申請資源,集群分配資源,啟動ex ecutor。driver將spark應用程式的 和檔案傳送給executor。executor上執行task,執行完之後將結果返回給driver或者寫入外界。2 複雜點說 提交應用程式,構建sparkcontext,構建dag圖,...

spark應用程式的執行架構

幾個基本概念 1 job 包含多個task組成的平行計算,往往由action催生。2 stage job的排程單位。3 task 被送到某個executor上的工作單元。4 taskset 一組關聯的,相互之間沒有shuffle依賴關係的任務組成的任務集。乙個應用程式由乙個driver progra...