map-reduce的過程首先是由客戶端提交乙個任務開始的。
提交任務主要是通過jobclient.runjob(jobconf)靜態函式實現的:
public static runningjob runjob(jobconf job) throws ioexception finally finally finally
//建立reduce task
this.reduces = new taskinprogress[numreducetasks];
for (int i = 0; i < numreducetasks; i++) else if (action instanceof committaskaction) else else else finally else else finally finally while (kvfull);
} finally catch (mapbuffertoosmallexception e) else {
combinecollector.setwriter(writer);
combineandspill(kviter, combineinputcounter);
reducetask的run函式如下:
public void run(jobconf job, final taskumbilicalprotocol umbilical)
throws ioexception {
job.setboolean("mapred.skip.on", isskipping());
//對於reduce,則包含三個步驟:拷貝,排序,reduce
if (ismaporreduce()) {
copyphase = getprogress().addphase("copy");
sortphase = getprogress().addphase("sort");
reducephase = getprogress().addphase("reduce");
startcommunicationthread(umbilical);
final reporter reporter = getreporter(umbilical);
initialize(job, reporter);
//copy階段,主要使用reducecopier的fetchoutputs函式獲得map的輸出。建立多個執行緒mapoutputcopier,其中copyoutput進行拷貝。
boolean islocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!islocal) {
reducecopier = new reducecopier(umbilical, job);
if (!reducecopier.fetchoutputs()) {
copyphase.complete();
//sort階段,將得到的map輸出合併,直到檔案數小於io.sort.factor時停止,返回乙個iterator用於訪問key-value
setphase(taskstatus.phase.sort);
statusupdate(umbilical);
final filesystem rfs = filesystem.getlocal(job).getraw();
rawkeyvalueiterator riter = islocal
? merger.merge(job, rfs, job.getmapoutputkeyclass(),
job.getmapoutputvalueclass(), codec, getmapfiles(rfs, true),
!conf.getkeepfailedtaskfiles(), job.getint("io.sort.factor", 100),
new path(gettaskid().tostring()), job.getoutputkeycomparator(),
reporter)
: reducecopier.createkviterator(job, rfs, reporter);
mapoutputfilesondisk.clear();
sortphase.complete();
//reduce階段
setphase(taskstatus.phase.reduce);
reducer reducer = reflectionutils.newinstance(job.getreducerclass(), job);
class keyclass = job.getmapoutputkeyclass();
class valclass = job.getmapoutputvalueclass();
reducevaluesiterator values = isskipping() ?
new skippingreducevaluesiterator(riter,
job.getoutputvaluegroupingcomparator(), keyclass, valclass,
job, reporter, umbilical) :
new reducevaluesiterator(riter,
job.getoutputvaluegroupingcomparator(), keyclass, valclass,
job, reporter);
//逐個讀出key-value list,然後呼叫reducer的reduce函式
while (values.more()) {
reduceinputkeycounter.increment(1);
reducer.reduce(values.getkey(), values, collector, reporter);
values.nextkey();
values.informreduceprogress();
reducer.close();
out.close(reporter);
done(umbilical);
map-reduce的過程總結如下圖:
map reduce 過程的認識
map reduce 過程的認識 最初我一直簡單的以為map 的工作就是將資料打散,而reduce 就是將map 打散後的資料合併。雖然之前跑過wordcount 的例子,但之前只是對輸出reduce 最終的結果感興趣,對控制台列印的日誌資訊完全不懂。這幾天我們團隊在探索pagerank 才開始對m...
mapreduce具體解析
1.mapreduce作業執行流程 下面貼出我用visio2010畫出的流程示意圖 流程分析 1.在客戶端啟動乙個作業。2.向jobtracker請求乙個job id。3.將執行作業所需要的資源檔案複製到hdfs上,包括mapreduce程式打包的jar檔案 配置檔案和客戶端計算所得的輸入劃分資訊。...
Map Reduce過程概述
map reduce的過程首先是由客戶端提交乙個任務開始的。提交任務主要是通過jobclient.runjob jobconf 靜態函式實現的 public static runningjob runjob jobconf job throws ioexception finally finally...