spark作為分布式的大資料處理框架必然或涉及到大量的作業排程,如果能夠理解spark中的排程對我們編寫或優化spark程式都是有很大幫助的;
在spark中存在轉換操作(transformation operation)與行動操作(action operation)兩種;而轉換操作只是會從乙個rdd中生成另乙個rdd且是lazy的,spark中只有行動操作(action operation)才會觸發作業的提交,從而引發作業排程;在乙個計算任務中可能會多次呼叫 轉換操作這些操作生成的rdd可能存在著依賴關係,而由於轉換都是lazy所以當行動操作(action operation )觸發時才會有真正的rdd生成,這一系列的rdd中就存在著依賴關係形成乙個dag(directed acyclc graph),在spark中dagscheuler是基於dag的頂層排程模組;
1.1 作業排程關係圖
這裡根據spark原始碼跟蹤觸發action操作時觸發的job提交流程,count()是rdd中的乙個action操作所以呼叫count時會觸發job提交;
在rdd原始碼count()呼叫sparkcontext的runjob,在runjob方法中根據partitions(分割槽)大小建立arrays存放返回結果;
rdd.scala
/*** return the number of elements in the rdd.
*/def count(): long = sc.runjob(this, utils.getiteratorsize _).sum
sparkcontext.scala
def runjob[t, u: classtag](
rdd: rdd[t],
func: (taskcontext, iterator[t]) => u,
partitions: seq[int],
resulthandler: (int, u) => unit): unit =
dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
}
在sparkcontext中將呼叫dagscheduler的runjob方法提交作業,dagscheduler主要任務是計算作業與任務依賴關係,處理呼叫邏輯;dagscheduler提供了submitjob與runjob方法用於 提交作業,runjob方法會一直等待作業完成,submitjob則返回jobwaiter物件可以用於判斷作業執行結果;
在runjob方法中將呼叫submitjob,在submitjob中把提交操作放入到事件迴圈佇列(dagschedulereventprocessloop)中;
def submitjob[t, u](
rdd: rdd[t],
func: (taskcontext, iterator[t]) => u,
partitions: seq[int],
callsite: callsite,
resulthandler: (int, u) => unit,
properties: properties): jobwaiter[u] =
在事件迴圈佇列中將呼叫eventprocessloop的onreceive方法;
提交作業時dagscheduler會從rdd依賴鏈尾部開始,遍歷整個依賴鏈劃分排程階段;劃分階段以shuffledependency為依據,當沒有shuffledependency時整個job 只會有乙個stage;在事件迴圈佇列中將會呼叫dagscheduler的handlejobsubmitted方法,此方法會拆分stage、提交stage;
private[scheduler] def handlejobsubmitted(jobid: int,
finalrdd: rdd[_],
func: (taskcontext, iterator[_]) => _,
partitions: array[int],
callsite: callsite,
listener: joblistener,
properties: properties)
在提交stage時會先呼叫getmissingparentstages獲取父階段stage,迭代該階段所依賴的父排程階段如果存在則先提交該父階段的stage 當不存在父stage或父stage執行完成時會對當前stage進行提交;
private def submitstage(stage: stage) else
waitingstages += stage}}
} ......
}
Spark作業排程
spark在standalone模式下,預設是使用fifo的模式,我們可以使用spark.cores.max來設定它的最大核心數,使用spark.executor.memory 來設定它的記憶體。在yarn模式下,使用 num workers設定worker的數量,使用 worker memory設...
Spark作業排程流程
spark首先會對job進行一系列的rdd轉換操作,並通過rdd之間的依賴關係構建dag 有向無環圖 然後根據rdd依賴關係將rdd劃分到不同的stage中,每個stage按照partition的數量建立多個task,最後將這些task提交到集群的work節點上執行。具體流程如下圖所示 通過rdd構...
spark作業提交失敗分析
提交乙個spark作業,報錯 error spark.sparkcontext error inilializing sparkcontext.再提交乙個yarn作業,hadoop jar opt cloudera parcels cdh 6.1 jars hadoop examples.jar p...