1.spark-submit 指令碼,在指令碼裡呼叫了org.apache.spark.deloy.sparksubmit 類
2.sparksubmit.scala main方法
override def main(args: array[string]): unit =
}}
private def submit(args: sparksubmitarguments): unit = catch was not a rest server. " +
"falling back to legacy submission gateway instead.")
args.userest = false
submit(args)
}// in all other modes, just run the main class as prepared
} else
}
4.sparksubmit.scala runmain()
該方法主要是 確定mainclass,使用classfromname,獲取類物件,然後採用對映呼叫main方法
5.client main方法
在main方法中,new clientendpoint 物件建立過程中,會向master傳送registerdriver訊息。
object client
// scalastyle:on println
val conf = new sparkconf()
val driverargs = new clientarguments(args)
if (!conf.contains("spark.rpc.asktimeout"))
logger.getrootlogger.setlevel(driverargs.loglevel)
val rpcenv =
rpcenv.create("driverclient", utils.localhostname(), 0, conf, new securitymanager(conf))
val masterendpoints = driverargs.masters.map(rpcaddress.fromsparkurl).
map(rpcenv.setupendpointref(_, master.endpoint_name))
rpcenv.setupendpoint("client", new clientendpoint(rpcenv, driverargs, masterendpoints, conf))
rpcenv.awaittermination()
}}
6. 之後就是driver 啟動,sparkcontext初始化的過程了
2.worker 在啟動executor的時候,先new executorrunner,runner不是程序也不是執行緒,只是乙個物件,在runner.start()中,使用執行緒非同步 啟動了乙個執行緒,該執行緒用於啟動executorbackend.
3.executor執行結束之後,使用backend.updatestatus() 向schedulerbackend 傳送訊息,schedulerbackend 的receive中,會把結果交給taskschedular進行處理,然後按照處理的結果在進行相關操作,比如,如果執行成功不需要重試,那麼schedulerbackend,就會把cores加到freecores中,然後呼叫makeoffers() 重新進行task的資源分配,看有沒有滿足資源條件的task可以執行。
job提交之後,呼叫runjob,到最終task被分配到executor之前所涉及到的排程相關
1.首先涉及到的排程是job stage 劃分和提交過程,也就是submitstage方法,所有又依賴的satge,也就是說有父satge的子stage,子stage呼叫submitsatge的時候,會將子satge新增到watingsatge佇列中,換句話說,如果乙個stage有父依賴,那麼他就不能被subnitmissingsatge submit,會被加入到watingsatge,只有沒有依賴的satge才會被提交。
沒有依賴的stage提交,會將satge轉換成tasksetmanager,提交給taskscheduar
2.taskschedular在初始化的時候,方法位於sparkcontext中,初始化的時候初始化了乙個佇列,這個佇列有兩個選擇:fifo/fair,
tasksetmanager提交給taskschedular的時候就會加入到該佇列中,比如fifo佇列,有兩層排序,一層是根據jobid,jobid越小的優先順序越高,同一job內部,存在第二層排序,stageid,stageid越小的優先順序越高
值得足以的一點就是:stage提交的時候,有依賴,就不會新增到佇列中,會加入到watingsatge中,等待某乙個stage完成之後,會檢查watingsatge提交已經沒有依賴的stage
Spark原始碼分析 Spark整體架構
術語 描述使用者編寫的程式。driver端的sparkcontext sparkconf和執行在executors上使用者編寫的業務邏輯 即map reduce reducebykey等 driver 執行使用者編寫應用程式的main 方法並建立sparkcontext worker 具體執行應用程...
spark原始碼之TaskScheduler解讀
1 spark任務的真正的執行時由action運算元進行乙個觸發,最終呼叫sc.runjob方法,在driver端會初始化2個重要的組建dagscheduler和taskscheduler,a taskscheduler的主要職責 a.1負責將dagscheduler傳送過來的的taskset放入到...
spark原始碼剖析 RDD相關原始碼閱讀筆記
最好的原始碼閱讀方法就是除錯,沒有之一 之前其實有閱讀過rdd相關的原始碼,最近學習過程中發現在之前原本閱讀過的模組中有一些 關節 並沒有打通,所以想通過除錯的方式來更細緻得學習原始碼。本文為編寫測試用例並除錯rdd相關模組的筆記,並沒有列出具體的除錯過程,僅列出結論以做備忘,特別是那些比較容易忽略...