sparkcontext物件包含有乙個私有屬性dagscheduler階段排程器,主要用於階段的劃分。在乙個應用程式中,任務的提交都是從行動運算元觸發的。行動運算元的方法內部會呼叫乙個runjob方法,其中就有dag排程器發揮執行job的作用:
dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
runjob方法中,會執行submitjob方法:
val waiter = submitjob(rdd, func, partitions, callsite, resulthandler, properties)
val waiter = new jobwaiter[u](this, jobid, partitions.size, resulthandler)
eventprocessloop.post(jobsubmitted(
jobid, rdd, func2, partitions.toarray, callsite, waiter,
utils.cloneproperties(properties)))
此處有乙個jobsubmitted事件,這個事件作為post方法的引數,該post方法主要用於將事件放入到乙個佇列中,然後等待事件執行緒執行佇列中的事件:
def post(event: e): unit = else
}}
檢視這個事件執行緒eventthread,當這個事件執行緒執行的時候,會執行run方法,在方法的內部會取出事件佇列中的事件。
private[spark] val eventthread = new thread(name) catch catch }}
} catch
}}
override def onreceive(event: dagschedulerevent): unit = finally
}
這是階段排程器的主要事件迴圈。該方法又將事件傳給了doonreceive方法,
private def doonreceive(event: dagschedulerevent): unit = event match
該方法中包含模式匹配,jobsubmitted事件正好可以匹配到第一項,說白了就是dagscheduler類會向事件佇列傳送乙個訊息,訊息中包含的是事件,然後事件執行緒收到訊息之後會對訊息進行匹配。此處會執行handlejobsubmitted方法,檢視其原始碼,其中
private[scheduler] def handlejobsubmitted(jobid: int,
finalrdd: rdd[_],
func: (taskcontext, iterator[_]) => _,
partitions: array[int],
callsite: callsite,
listener: joblistener,
properties: properties): unit = catch
resultstage中包含的rdd就是執行行動運算元的那個rdd(下圖中黃色表示的那個),也就是最後的那個rdd(下圖中黃色圖表示的rdd)。parents是resultstage的上一級階段,parents是getorcreateparentstages方法的返回值。getorcreateparentstages用於獲取或者建立給定rdd的父階段列表。
private def getorcreateparentstages(rdd: rdd[_], firstjobid: int): list[stage] = .tolist
}
getshuffledependencies方法用於獲取給定rdd的shuffle依賴,其核心**如下:
private[scheduler] def getshuffledependencies(
rdd: rdd[_]): hashset[shuffledependency[_, _, _]] =
}} parents
}
核心**用於判斷給定rdd的依賴關係是不是shuffle依賴,如果是則加入結果列表。最終返回的結果列表,會通過map方法將列表中的每乙個元素執行getorcreateshufflemapstage方法,該方法用於獲取或者建立shufflemap階段(寫磁碟之前的階段)。
getorcreateshufflemapstage(shuffledep, firstjobid)=>createshufflemapstage(shuffledep, firstjobid)
createshufflemapstage方法中會建立shufflemapstage物件,並當前rdd(呼叫行動運算元的那個)依賴的rdd(下圖紫色那個rdd)傳給這個物件。
def createshufflemapstage[k, v, c](
shuffledep: shuffledependency[k, v, c], jobid: int): shufflemapstage = {
val rdd = shuffledep.rdd
…… val numtasks = rdd.partitions.length
val parents = getorcreateparentstages(rdd, jobid)
val id = nextstageid.getandincrement()
val stage = new shufflemapstage(
id, rdd, numtasks, parents, jobid, rdd.creationsite, shuffledep, mapoutputtracker)
……
此時,shufflemap階段(下圖紅色區域)就是result階段(藍色區域)的上一級階段。在上面的**中,我們還可以看到,如果當前shufflemap階段還有上一級階段,那麼getorcreateparentstages(rdd, jobid)方法還是會獲取它的上一級階段的,此時這個方法中的rdd就不再是最後乙個rdd,而是最後乙個rdd的前乙個rdd,也就是紫色表示的那個rdd。也就是說,階段的尋找是乙個不斷往前的過程,只要含有shuffle過程,那麼就會有新的階段。
spark應用程式的執行架構
spark應用程式的執行架構 幾個基本概念 1 job 包含多個task組成的平行計算,往往由action催生。2 stage job的排程單位。3 task 被送到某個executor上的工作單元。4 taskset 一組關聯的,相互之間沒有shuffle依賴關係的任務組成的任務集。乙個應用程式由...
Spark應用程式的執行架構
1 簡單的說 由driver向集群申請資源,集群分配資源,啟動ex ecutor。driver將spark應用程式的 和檔案傳送給executor。executor上執行task,執行完之後將結果返回給driver或者寫入外界。2 複雜點說 提交應用程式,構建sparkcontext,構建dag圖,...
spark應用程式的執行架構
幾個基本概念 1 job 包含多個task組成的平行計算,往往由action催生。2 stage job的排程單位。3 task 被送到某個executor上的工作單元。4 taskset 一組關聯的,相互之間沒有shuffle依賴關係的任務組成的任務集。乙個應用程式由乙個driver progra...