spark提交作業 呼叫action運算元 --> 呼叫 rdd 類的runjob方法 --> 呼叫 sparkcontext 類的 dagscheduler.runjob方法
--> dagscheduler.handlejobsubmitted 方法
生成 finalstage
finalstage = createresultstage()
submitstage(finalstage) // 在這個方法裡面體現出乙個分割槽對應乙個task。核心**是 partitionstocompute.map
-->createresultstage() 方法中 getorcreateparentstage(),這個方法裡面會算出每乙個shuffledependices,就是寬依賴,對每乙個寬依賴呼叫map運算元,建立乙個stage。就是說按照shuffle切分stage
(1) rdd.foreach()
(2) rdd}
(3) sparkcontext //類
runjob(rdd, func, 0 until rdd.partitions.length)
runjob(rdd, (ctx: taskcontext, it: iterator[t]) => cleanedfunc(it), partitions)
runjob[t, u](rdd, func, partitions, (index, res) => results(index) = res)
dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
(4) dagscheduler
val waiter = submitjob(rdd, func, partitions, callsite, resulthandler, properties)
submitjob
private def doonreceive(event: dagschedulerevent): unit = event match
private[scheduler] def handlejobsubmitted
// 建立job
val job = new activejob(jobid, finalstage, callsite, listener, properties)
// 提交stage
submitstage(finalstage) }
private def createresultstage: resultstage =
private def getorcreateparentstages(rdd: rdd[_], firstjobid: int): list[stage] = .tolist
}private[scheduler] def getshuffledependencies(
rdd: rdd[_]): hashset[shuffledependency[_, _, _]] = }}
// 最終返回所有依賴
parents
} private def submitstage(stage: stage)
case stage: resultstage =>
partitionstocompute.map }}
......
// 提交任務
taskscheduler.submittasks(new taskset(tasks.toarray, stage.id, stage.latestinfo.attemptid, jobid, properties))
}}
Job提交流程原始碼
1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...
Spark任務提交流程
spark任務提交流程挺複雜的,下面給乙個相對簡單的任務提交流程 driver程序啟動以後,首先構建sparkcontext,sparkcontext主要包含兩部分 dagscheduler和taskscheduler master接受到任務註冊資訊之後,根據自身資源呼叫演算法在spark集群的wo...
Spark任務提交流程
建立sparkcontext物件,其中包含dagscheduler和taskscheduler executor內部會建立執行task的執行緒池,然後把啟動的executor反向註冊給driver dagscheduler負責把spark作業轉化成stage的dag,根據寬窄依賴切分stage,然後...