1. job.waitforcompletion(true); 在driver中提交job
1) sumbit() 提交
(1) connect():
<1> return new cluster(getconfiguration());
① initialize(jobtrackaddr, conf);
通過yarnclientprotocolprovider | localclientprotocolprovider 根據配置檔案的引數資訊
獲取當前job需要執行到本地還是yarn
最終:localclientprotocolprovider ==> localjobrunner
(2) return submitter.submitjobinternal(job.this, cluster); 提交job
<1> . checkspecs(job); 檢查job的輸出路徑。
<2> . path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);
生成job提交的臨時目錄
d:\tmp\hadoop\mapred\staging\administrator1777320722\.staging
<3> . jobid jobid = submitclient.getnewjobid(); 為當前job生成id
<4> . path submitjobdir = new path(jobstagingarea, jobid.tostring()); job的提交路徑
d:/tmp/hadoop/mapred/staging/administrator1777320722/.staging/job_local1777320722_0001
<5> . copyandconfigurefiles(job, submitjobdir);
① ruploader.uploadresources(job, jobsubmitdir);
[1] uploadresourcesinternal(job, submitjobdir);
.submitjobdir = jtfs.makequalified(submitjobdir);
mkdirs(jtfs, submitjobdir, mapredsysperms);
建立job的提交路徑
<6> . int maps = writesplits(job, submitjobdir); //生成切片資訊 ,並返回切片的個數
<7> . conf.setint(mrjobconfig.num_maps, maps); //通過切片的個數設定maptask的個數
<8> . writeconf(conf, submitjobfile); //將當前job相關的配置資訊寫到job提交路徑下
路徑下: job.split job.splitmetainfo job.xml ***.jar
<9> .status = submitclient.submitjob(
jobid, submitjobdir.tostring(), job.getcredentials());
//真正提交job
<10> . jtfs.delete(submitjobdir, true); //等job執行完成後,刪除job的臨時工作目錄的內容
Job提交流程原始碼
1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...
MR Job提交流程原始碼和切片原始碼詳解
waitforcompletion submit 1建立連線 connect 1 建立提交job的 newcluster getconfiguration 1 判斷是本地yarn還是遠端 initialize jobtrackaddr,conf 2 提交job submitter.submitjob...
Hadoop框架 job提交流程 本地模式
主要是將任務提交到集群中去並等待完成 boolean verbose 是否將進度列印給使用者看 return 任務成功返回true public boolean waitforcompletion boolean verbose throws ioexception,interruptedexcep...