// 1. 開始提交程式
boolean result = job.
waitforcompletion
(true);
// 2. 當job執行狀態為為define, 提交job
if(state == jobstate.define)
// 3. 確保job狀態
ensurestate
(jobstate.define)
;// 4. 相容新舊api
setusenewapi()
;// 5. 建立乙個連線, 判斷是本地連線還是集群連線
connect()
;// 5.1如若有集群的連線資訊, connect()方法結束, 否則. 使用配置資訊建立乙個新的集群連線
if(cluster == null)})
;}// 6 使用配置好的集群hdfs物件和客戶端物件建立乙個jobsubmitter
final jobsubmitter submitter =
getjobsubmitter
(cluster.
getfilesystem()
, cluster.
getclient()
);//7 submitter.submitjobinternal(job.this, cluster);
// 7.1 對輸出的命名空間進行檢查, 如果存在:
// 7.1.1 輸出路徑存在, 丟擲路徑存在異常
// 7.1.2 輸出路=路徑沒有設定, 丟擲路徑未設定異常
checkspecs
(job)
;// 7.2 構建建乙個能放臨時檔案的path物件.
fs.mkdirs
(stagingarea,
newfspermission
(job_dir_permission));
// 7.3 得到乙個唯一的jobid, 並設定
jobid jobid = submitclient.
getnewjobid()
;job.
setjobid
(jobid)
;// 7.4 將7.2和7.3的路徑組合起來形成乙個新的臨時檔案存放目錄
path submitjobdir =
newpath
(jobstagingarea, jobid.
tostring()
);// 7.4.1拷貝jar包到集群, 本地模式無此步驟
copyandconfigurefiles
(job, submitjobdir)
;ruploader.
uploadfiles
(job, jobsubmitdir)
;// 7.4.2 進行檔案輸入切片, 下一節詳細說
int maps =
writesplits
(job, submitjobdir)
;// 7.5 快取相關設定
tryfinally
}// 7.6 開始提交job
status = submitclient.
submitjob
( jobid, submitjobdir.
tostring()
, job.
getcredentials()
);// 7.6.1 當提交方式為 localjobrunner
job job =
newjob
(jobid.
downgrade
(jobid)
, jobsubmitdir)
;job.job.
setcredentials
(credentials)
;// 7.6.2 當提交方式為 yarnjobrunner, 將job提交給 resourcemanager
resmgrdelegate.
;// 7.7 監控任務狀態
monitorandprintjob()
;// 當執行成功 , 返回 issuccessful()
Job提交流程原始碼解析
1.job.waitforcompletion true 在driver中提交job 1 sumbit 提交 1 connect 1 return new cluster getconfiguration initialize jobtrackaddr,conf 通過yarnclientprotoc...
MR Job提交流程原始碼和切片原始碼詳解
waitforcompletion submit 1建立連線 connect 1 建立提交job的 newcluster getconfiguration 1 判斷是本地yarn還是遠端 initialize jobtrackaddr,conf 2 提交job submitter.submitjob...
Hadoop框架 job提交流程 本地模式
主要是將任務提交到集群中去並等待完成 boolean verbose 是否將進度列印給使用者看 return 任務成功返回true public boolean waitforcompletion boolean verbose throws ioexception,interruptedexcep...