在之前的文章中,已經分析了stage的劃分演算法,這裡我們到原始碼裡面去看劃分演算法是怎麼實現的。
首先找到提交job的入口(從action操作開始,找到action操作的runjob -> dagscheduler.runjob -> submitjob -> eventprocessloop.jobsubmitted -> handlejobsubmitted)handlejobsubmitted()這個方法,下面我們具體分析原始碼
private
[scheduler] def handlejobsubmitted
(jobid: int,
finalrdd: rdd[_]
, func:
(taskcontext, iterator[_])=
> _,
partitions: array[int]
, callsite: callsite,
listener: joblistener,
properties: properties)
catch
// 用finalstage建立乙個job,裡面封裝了job的一些資訊(比如partition的數量,resultstage和shufflemapstage是不一樣的,這裡在效能調優的時候再講)
val job =
newactivejob
(jobid, finalstage, callsite, listener, properties)
// 清除rdd快取
clearcachelocs()
loginfo
("got job %s (%s) with %d output partitions"
.format
( job.jobid, callsite.shortform, partitions.length)
)loginfo
("final stage: "
+ finalstage +
" ("
+ finalstage.name +
")")
loginfo
("parents of final stage: "
+ finalstage.parents)
loginfo
("missing parents: "
+getmissingparentstages
(finalstage)
) val jobsubmissiontime = clock.
gettimemillis()
// 將job加入記憶體快取中
jobidtoactivejob
(jobid)
= job
activejobs += job
finalstage.
setactivejob
(job)
val stageids =
jobidtostageids
(jobid)
.toarray
val stageinfos = stageids.
flatmap
(id =
> stageidtostage.
get(id)
.map
(_.latestinfo)
) listenerbus.
post
(sparklistenerjobstart
(job.jobid, jobsubmissiontime, stageinfos, properties)
)// 提交finalstage
// 這個方法會導致第乙個stage被提交,並且其他stage,都放入了waitingstages裡了。
submitstage
(finalstage)
// 提交完第乙個stage0後,剩餘的stage,通過這個函式提交
submitwaitingstages()
}
上面的**中,首先使用觸發job的最後乙個rdd,建立乙個finalstage,這個stage是resultstage(乙個job裡面只有最後乙個stage是resultstage,其餘的都是shufflemapstage),然後建立job(裡面包含了job的一些資訊),並將job的相關資訊放入快取中,接著就建立了比較重要的submitstage(finalstage)方法,這個方法裡面就包含了stage的劃分和提交;而submitwaitingstages()則是提交剩餘的stage,下面我們分析一下submitstage(finalstage)方法。
private def submitstage
(stage: stage)
else
// 並且將當前stage,放入waitingstages等待佇列中
waitingstages += stage
}}}else
}
下面簡單說是如何進行stage的劃分的,看注釋,首先就是根據當前這個stage找到它的父stage,假如父stage的rdd與當前stage的rdd是寬依賴的關係,那麼就用這個寬依賴的rdd建立乙個shufflemapstage,並返回;假如不存在寬依賴,那麼就一直遍歷下去,直到第乙個rdd為止。我們看一下getmissingparentstages()方法。
private def getmissingparentstages
(stage: stage)
: list[stage]
=// 如果是窄依賴,那麼將rdd放入棧中
case narrowdep: narrowdependency[_]
=>
waitingforvisit.
push
(narrowdep.rdd)}}
}}}// 首先往棧中推入stage最後的乙個rdd
waitingforvisit.
push
(stage.rdd)
// 進行while迴圈
while
(waitingforvisit.nonempty)
missing.tolist
}
可以看到這個方法也是乙個遞迴呼叫,為了防止棧溢位,使用了乙個stack結構waitingforvisit,我們看內部函式visit()方法,它會遍歷當前rdd的依賴,假如存在shuffle依賴,那麼就建立乙個shufflemapstage,並返回,否則就一直執行下去,直到沒有父rdd為止,這也就意味著整個job只建立了乙個stage(resultstage)。
下面接著看submitstage的原始碼,通過getmissingparentstages()獲取當前stage的父stage,假如存在,就執行到下面這塊**,這塊**就是stage劃分演算法的推動者:
if
(missing.isempty)
else
// 並且將當前stage,放入waitingstages等待佇列中
waitingstages += stage
}
如果存在父stage,那麼遍歷父stage,並遞迴呼叫submitstage();我們以下面這幅經典的圖來說明:
如上面圖所示,首先以最後乙個rddg建立finalstage(stage3),接著通過**getmissingparentstages()**找它的父rdd,它有兩個父rdd,分別是rddf和rddb。先看rddb,它與rddg是窄依賴,接著遍歷rddb的父rdd,也即rdda,它們之間是groupbykey操作(發生了shuffle),是寬依賴,因此建立乙個stage1,這個遍歷結束;接著看rddf,它與rddg之間的操作是join,也發生了shuffle操作,因此建立了stage2;在遍歷完兩個父rdd之後,就返回getmissingparentstages()函式。這時候missing列表裡面包含了stage1和stage2。
下面判斷missing是否為空,由於包含了stage1和stage2,因此不為空,就接著遍歷。先遍歷stage1,由於stage1沒有依賴,因此它的missing是空,那麼這裡就呼叫submitmissingtasks()提交stage1;接著遍歷到stage2,stage2的rddf的父rdd中沒有寬依賴,因此它的missing列表也為空,提交stage2。
接著執行到waitingstages,它將finalstage,也就stage3加入這個等待佇列中。到這裡整個submitstage()就執行完成,只有最後乙個stage被加入了等待佇列中。
submitstage()執行完成之後,接著執行submitwaitingstages(),提交加入等待佇列的stage。
整個stage的劃分以及提交就結束了。以上就是stage的劃分以及提交過程,主要分析了一下流程,至於細節這塊,以後再慢慢研究,更新部落格。
總結一下,stage的劃分是以shuffle為界,也即寬依賴,如果rdd之間發生了shuffle,那麼就會以shuffle為界建立新的stage,依次內推。而stage的提交是遞迴提交,最先建立的stage,會最後提交,這剛好符合rdd的處理流程的先後順序。
IP位址歸劃
不管是學習網路還是上網,ip位址都是出現頻率非常高的詞。windows系統中設定ip位址的介面如圖1所示,圖中出現了ip位址 子網掩碼 預設閘道器和dns伺服器這幾個需要設定的地方,只有正確設定,網路才能通,那這些名詞都是什麼意思呢?學習ip位址的相關知識時還會遇到網路位址 廣播位址 子網等概念,這...
Ubuntu 劃詞翻譯
在 ubuntu 下可以自己寫指令碼實現乙個簡陋的版本。步驟如下 然後把以下 複製進乙個 notify translate.sh 檔案中,usr bin env bash need installed.need xsel or xclip installed.se xsel b n o tr n t...
劃重點 Python xlrd簡介
import xlrd data xlrd.open workbook r c users asus desktop 重新開始 python獲取excel資料 user1.xlsx print data.sheet names 獲取excel檔案所有sheet名字 table data.sheet ...