dagscheduler的stage劃分演算法:會從觸發的action操作的那個rdd開始往前倒推,首先會為最後乙個rdd建立乙個stage,然後往前倒推的時候,如果發現對某個rdd是寬依賴,那麼就會將寬依賴的那個rdd建立乙個新的stage,那個rdd就是對新的stage的最後乙個rdd,然後依次類推,繼續往前倒推,根據寬窄依賴,進行stage的劃分,直到所有的rdd全部遍歷完了為之。
在**執行了運算元之後,比如count(),**依次如下
def
count
(): long = sc.runjob(this, utils.getiteratorsize _).sum
def runjob[t, u: classtag](rdd: rdd[t], func: iterator[t] => u): array[u] =
def
runjob[t, u: classtag](
rdd: rdd[t],
func: iterator[t] => u,
partitions: seq[int]): array[u] =
def
runjob[t, u: classtag](
rdd: rdd[t],
func: (taskcontext, iterator[t]) => u,
partitions: seq[int]): array[u] =
def
runjob[t, u: classtag](
rdd: rdd[t],
func: (taskcontext, iterator[t]) => u,
partitions: seq[int],
resulthandler: (int, u) => unit): unit =
val callsite = getcallsite
val cleanedfunc = clean(func)
loginfo("starting job: " + callsite.shortform)
if (conf.getboolean("spark.loglineage", false))
// 呼叫sparkcontext,之前初始化建立的dagscheduler的runjob()方法
dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
progressbar.foreach(_.finishall())
rdd.docheckpoint()
}
經過一系列的runjob呼叫,最後走到了具體功能實現的函式,
這個函式中最重要的就是dagscheduler.runjob()方法,接著進入dagscheduler的runjob函式,然後會呼叫submitjob() 函式,進入submitjob函式,dagschedulereventprocessloop 會post jobsubmitted的訊息。
private def
doonreceive
(event: dagschedulerevent): unit = event match catch
第一步:使用觸發job的最後乙個rdd,建立finalstage , 並且將stage漸入dagscheduler內部的記憶體快取區
finalstage
val job = new activejob(jobid, finalstage, callsite, listener, properties)
第二步,用finalstage建立乙個job, 就是說,這個job的最後乙個stage,當然就是我們的finalstage
jobidtoactivejob(jobid) =job
第三部,將job加入記憶體快取中
submitstage
(finalstage)
第四部,使用submitstage方法提交finalstage,這個方法的呼叫,其實會導致第乙個stage提交,並且導致其他所有的stage,都給放入waitingstage佇列裡。接下來我們看一下submitstage函式
// 其實就是stage劃分演算法的入口
// 但是,stage的劃分,其實就是由submitstage方法與getmissingparentstages方法共同組成的
private def submitstage(stage: stage) else
// 並且將當前stage,放入waitingstage等待執行的stage的佇列中
waitingstages += stage}}
} else
}
其中比較重要的函式是getmissingparentstages,進入函式內部
// 獲取某個stage的父stage
// 對乙個stage,如果它的最後乙個rdd的所有依賴都是窄依賴,那麼就不會建立任何新的stage
// 但是,只要發現這個stage的rdd寬依賴了某個rdd,那麼就用寬依賴的那個rdd,建立乙個新的stage
// 然後立即將新的stage返回
private
def getmissingparentstages(stage: stage): list[stage] =
// 如果是窄依賴,那麼將依賴的rdd放入棧
case narrowdep: narrowdependency[_] =>
waitingforvisit.push(narrowdep.rdd)}}
}}
}// 首先往棧中推入了stage最後乙個rdd
waitingforvisit.push(stage.rdd)
while (waitingforvisit.nonempty)
missing.tolist
}
ogg mysql的原理 OGG原理
ogg的資料整合技術實施主要含3程序 資料抽取程序 傳輸程序 應用程序 2個檔案 源資料庫 目標資料庫 1.出庫 投遞 入庫 啟動ogg程序 2.資料庫啟動歸檔模式sqlplus assysdbaarchiveloglist 3.建立gg使用者 4.oracle配置增量日誌 alterdatabas...
mvcc原理 Innodb的MVCC原理
該文章是 innodb的mvcc簡介 中的細節作出解釋。在mvcc出現之前的資料庫,為了實現一致性讀,如sqlserver,db2均採用鎖定讀技術,寫操作往往會阻塞讀操作,導致資料庫併發效能不高。oracle與postgre相繼推出自己的多版本併發控制技術,這一技術的核心是在發生讀寫衝突時候,讀操作...
ogg mysql的原理 OGG工作原理
一.goldengate介紹 ogg 是一種基於日誌的結構化資料複製軟體 ogg 能夠實現大量交易資料的實時捕捉,變換和投遞,實現源資料庫與目標資料庫的資料同步,保持最少10ms的資料延遲 二.工作原理 三.相關元件 1.manager 負責ogg 整體的監控和管理 1 trail檔案的生成和刪除 ...