/**
* 主要是將任務提交到集群中去並等待完成
* boolean verbose:是否將進度列印給使用者看
* return 任務成功返回true
*/public boolean waitforcompletion(boolean verbose
) throws ioexception, interruptedexception,
classnotfoundexception
if (verbose) else catch (interruptedexception ie) }}
return issuccessful();
}
public void submit()
throws ioexception, interruptedexception, classnotfoundexception
});state = jobstate.running;
log.info("the url to track the job: " + gettrackingurl());
}
private synchronized void connect()
throws ioexception, interruptedexception, classnotfoundexception
});}
} /**
* public cluster(configuration conf)
* 主要作用是獲取你在driver中configuration配置的檔案資訊,沒有配置使用預設
*/public cluster(configuration conf) throws ioexception
/*** 呼叫initialize(jobtrackaddr, conf)返回的值
* jobtrackaddr:local還是yarn
* conf:配置的資訊
*/public cluster(inetsocketaddress jobtrackaddr, configuration conf)
throws ioexception
/*** ...表示該方法中的校驗**
* jobtrackaddr:狀態
* conf:配置資訊
*/private void initialize(inetsocketaddress jobtrackaddr, configuration conf)throws ioexception else
...} /**
* yarn是否和driver中配置的conf.set("mapreduce.framework.name","yarn");
* 相同,相同的話就返回乙個yarnrunner物件,沒有配置的話就返回乙個null,而後進行
* 第二次判斷
*/public clientprotocol create(configuration conf) throws ioexception
/*** 之前返回為null的話就進行第二次判斷
*/public clientprotocol create(configuration conf) throws ioexception
conf.setint(jobcontext.num_maps, 1);
return new localjobrunner(conf);
} // --> submint() --> return submitter.submitjobinternal(job.this, cluster);
總結:connect方法最終要的地方就是,為我們建立了乙個關鍵的物件localjobrunner物件,這個物件為我們之後提交作業所用,很重要。
jobstatus submitjobinternal(job job, cluster cluster)throws classnotfoundexception, interruptedexception, ioexception {
/*校驗檔案輸出路徑是否在driver中配置,如果沒有配置丟擲invalidjobconfexception,如果檔案路徑存在丟擲 filealreadyexception
*/checkspecs(job);
// 獲取conf配置
configuration conf = job.getconfiguration();
addmrframeworktodistributedcache(conf);
path jobstagingarea = jobsubmissionfiles.getstagingdir(cluster, conf);
// 配置校驗資訊我就用...代替了,太多容易把眼睛看花,感興趣的朋友可以用debug邊跳邊看裡面的具體資訊。
...// 建立jobid 也就是8088埠中 任務的id
jobid jobid = submitclient.getnewjobid();
// 獲得jobid設定到job裡
job.setjobid(jobid);
// 將jobstagingarea和jobid拼在一起,拼成乙個提交資訊(配置資訊,切片資訊,jar包)的路徑
path submitjobdir = new path(jobstagingarea, jobid.tostring());
...// 拷貝jar包到集群(本地模式看不到,在向集群提交的時候才能看到jar包)
copyandconfigurefiles(job, submitjobdir);
// 會在submitjobdir目錄下建立乙個job.xml檔案
path submitjobfile = jobsubmissionfiles.getjobconfpath(submitjobdir);
log.debug("creating splits at " + jtfs.makequalified(submitjobdir));
/* 切片,具體怎麼切,再看切片原始碼的時候會提到,在執行完該方法後submitjobdir路徑中會多出split和crc 檔案
*/int maps = writesplits(job, submitjobdir);
Job提交流程原始碼
1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...
Job提交流程原始碼解析
1.job.waitforcompletion true 在driver中提交job 1 sumbit 提交 1 connect 1 return new cluster getconfiguration initialize jobtrackaddr,conf 通過yarnclientprotoc...
Spark的Job提交流程以及相關知識
spark提交作業 呼叫action運算元 呼叫 rdd 類的runjob方法 呼叫 sparkcontext 類的 dagscheduler.runjob方法 dagscheduler.handlejobsubmitted 方法 生成 finalstage finalstage createres...