client 提交應用,master節點啟動driver
sparkcontext向clustermanager申請executor資源,worker會先例項化executorrunner物件,在executorrunner啟動中會建立程序生成器processbuilder,然後由該生成器建立coarsegrainedexecutorbackend物件。
spark應用程式中會有各種轉換操作,會通過行動操作觸發job。job提交之後,會依據rdd之間的依賴關係構建dag圖。dag圖構建好之後,會交給dagscheduler進行解析。
dagscheduler是面向排程階段的高層次排程器。他會把dag拆分成相互依賴的排程階段(stage),stage是以rdd的依賴是否為寬依賴。當遇到寬依賴,就劃分為新的排程階段,每個排程階段包含乙個或者多個任務,這些任務形成任務集(task set)。dagscheduler會記錄哪些rdd被存入磁碟等物化操作,同時還會尋求任務的最優化排程(資料本地性)。dagscheduler會監控排程階段的執行過程,如果某個階段執行失敗,就會重新提交該階段。
dagscheduler會將taskset提交給taskscheduler。每個taskscheduler只為乙個sparkcontext服務,taskscheduler接收來自dagscheduler傳送來的任務集,taskscheduler接收到任務集後,會把任務集中以任務的形式乙個個分發到集群worker節點的executor中執行。如果任務執行失敗,taskscheduler要負責重試。如果某個任務一直執行不完,就可能啟動多個節點執行同乙個任務。
worker中的executor收到taskscheduler傳送過來的任務後,以多執行緒的方式執行。每個執行緒負責乙個任務。任務結束後需要返回給taskscheduler,不同型別的任務,返回的方式也不同。shufflemap task 返回的是乙個map status物件,而不是結果本身。result task會返回結果。
在返回結果時,對於executor的計算結果
1. 生成結果大小在(∞,
1gb)
(\infty, 1gb)
(∞,1gb
):直接丟棄。該配置項可以通過spark.driver.maxresultsize進行設定
2. 生成結果大小在(1g
b,
128mb−
200kb)
(1gb, 128mb-200kb)
(1gb,1
28mb
−200
kb):會把結果所在的taskid儲存至blockmanager中,然後將該編號通過netty傳輸給driver終端點。該閾值是netty框架的傳輸最大值spark.akka.framesize(預設是128mb)和netty的預留空間reservedsizebytes(200kb)的差值。
3. 生成結果大小在(
128mb−
200kb,
0)
(128mb-200kb, 0)
(128mb
−200
kb,0
):通過netty直接將結果傳送到driver終端點。
同時,taskrunner將任務的執行結果傳送給driverendpoint。該終端點會轉給taskschedulerimpl的stateupdate進行處理。
如果返回狀態是taskstate.finished,那麼呼叫taskresultgetter的enqueuesuccessfultask方法進行處理。如果是indirecttaskresult,就會通過blockid進行獲取:sparkenv.blockmanager.getremotebytes(blockid);如果是directresult,則可以直接獲取結果。
如果返回狀態時taskstate.failed、taskstate.killed或者是taskstate.lost,呼叫taskresultgetter的enqueuefailedtask進行處理。對於taskstate.lost,還需要將其所在的executor標記為failed,並根據更新後的executor重新進行排程。
shufflemap task還會涉及到shuffle過程。
standalone模式下,clustermanager即為master。在yarn下,clustermanager為資源管理器
driver program可以在master上執行,此時driver就在master節點上。如果是yarn集群,那麼driver可能被排程到worker node上執行。為了防止driver和executor間通訊過慢,一般原則上要使它們分布在同乙個區域網中
result task過後,若作業已完成,則標記已完成。
Spark作業排程流程
spark首先會對job進行一系列的rdd轉換操作,並通過rdd之間的依賴關係構建dag 有向無環圖 然後根據rdd依賴關係將rdd劃分到不同的stage中,每個stage按照partition的數量建立多個task,最後將這些task提交到集群的work節點上執行。具體流程如下圖所示 通過rdd構...
spark資源排程和任務排程
資源排程 1 executor預設在集群中分散啟動,可通過引數配置集中在某個work啟動,不過分散啟動有利於資料本地化。2 如果spark submit提交任務時,如果不指定 executor cores,則spark會在每個work中啟動乙個executor並消耗掉work中的所有core和1g的...
Spark的執行基本流程以及任務排程機制
cluster 模式用於監控和排程的 driver 模組啟動在 yarn 集群中執行,一般用於生產環境當中。excutor 程序啟動後會向 driver 進行反向註冊 內部通訊時 excutorbackend 向 excutor 全部註冊完成後driver 開始執行 main 函式 之後執行到 ac...