Job提交流程原始碼解析

2021-10-08 15:02:59 字數 1695 閱讀 5290

1. job.waitforcompletion(true); 在driver中提交job

1) sumbit() 提交

(1) connect():

<1> return new cluster(getconfiguration());

① initialize(jobtrackaddr, conf);

通過yarnclientprotocolprovider | localclientprotocolprovider 根據配置檔案的引數資訊

獲取當前job需要執行到本地還是yarn

最終:localclientprotocolprovider ==> localjobrunner

(2) return submitter.submitjobinternal(job.this, cluster); 提交job

<1> . checkspecs(job); 檢查job的輸出路徑。

<2> . path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);

生成job提交的臨時目錄

d:\tmp\hadoop\mapred\staging\administrator1777320722\.staging

<3> . jobid jobid = submitclient.getnewjobid(); 為當前job生成id

<4> . path submitjobdir = new path(jobstagingarea, jobid.tostring()); job的提交路徑

d:/tmp/hadoop/mapred/staging/administrator1777320722/.staging/job_local1777320722_0001

<5> . copyandconfigurefiles(job, submitjobdir);

① ruploader.uploadresources(job, jobsubmitdir);

[1] uploadresourcesinternal(job, submitjobdir);

.submitjobdir = jtfs.makequalified(submitjobdir);

mkdirs(jtfs, submitjobdir, mapredsysperms);

建立job的提交路徑

<6> . int maps = writesplits(job, submitjobdir); //生成切片資訊 ,並返回切片的個數

<7> . conf.setint(mrjobconfig.num_maps, maps); //通過切片的個數設定maptask的個數

<8> . writeconf(conf, submitjobfile); //將當前job相關的配置資訊寫到job提交路徑下

路徑下: job.split job.splitmetainfo job.xml ***.jar

<9> .status = submitclient.submitjob(

jobid, submitjobdir.tostring(), job.getcredentials());

//真正提交job

<10> . jtfs.delete(submitjobdir, true); //等job執行完成後,刪除job的臨時工作目錄的內容

Job提交流程原始碼

1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...

MR Job提交流程原始碼和切片原始碼詳解

waitforcompletion submit 1建立連線 connect 1 建立提交job的 newcluster getconfiguration 1 判斷是本地yarn還是遠端 initialize jobtrackaddr,conf 2 提交job submitter.submitjob...

Hadoop框架 job提交流程 本地模式

主要是將任務提交到集群中去並等待完成 boolean verbose 是否將進度列印給使用者看 return 任務成功返回true public boolean waitforcompletion boolean verbose throws ioexception,interruptedexcep...