reduce階段就是處理map的輸出資料,大部分過程和map差不多
1方法開始和maptask類似,包括initialize()初始化,根據情況看是否呼叫runjobcleanuptask(),2//
runjobsetuptask(),runtaskcleanuptask()。之後進入正式的工作,主要有這麼三個步驟:copy、sort、reduce。
3@override
4 @suppresswarnings("unchecked")
5public
void run(jobconf job, final
taskumbilicalprotocol umbilical)
6throws
ioexception, interruptedexception, classnotfoundexception
15//
start thread that will handle communication with parent
16//
設定並啟動reporter程序以便和tasktracker進行交流
17 taskreporter reporter = new
taskreporter(getprogress(), umbilical,
18jvmcontext);
19reporter.startcommunicationthread();
20//
在job client中初始化job時,預設就是用新的api,詳見job.setusenewapi()方法
21boolean usenewapi =job.getusenewreducer();
22/*
用來初始化任務,主要是進行一些和任務輸出相關的設定,比如建立commiter,設定工作目錄等
*/23 initialize(job, getjobid(), reporter, usenewapi);//
這裡將會處理輸出目錄
24/*
以下4個if語句均是根據任務型別的不同進行相應的操作,這些方 法均是task類的方法,所以與任務是maptask還是reducetask無關
*/25
//check if it is a cleanupjobtask
26if
(jobcleanup)
30if
(jobsetup)
35if
(taskcleanup)
4041
//initialize the codec
42 codec =initcodec();
4344
boolean islocal = "local".equals(job.get("mapred.job.tracker", "local")); //
判斷是否是單機hadoop
45if (!islocal)
53throw
new ioexception("task: " + gettaskid() +
54 " - the reduce copier failed", reducecopier.mergethrowable);55}
56}57 copyphase.complete(); //
copy is already complete
58setphase(taskstatus.phase.sort);
59statusupdate(umbilical);
6061
final filesystem rfs =filesystem.getlocal(job).getraw();
62//
2.sort(其實相當於合併).排序工作,就相當於上述排序工作的乙個延續。它會在所有的檔案都拷貝完畢後進行。
63//
使用工具類merger歸併所有的檔案。經過這乙個流程,乙個合併了所有所需map任務輸出檔案的新檔案產生了。
64//
而那些從其他各個伺服器網羅過來的 map任務輸出檔案,全部刪除了。
6566
//根據hadoop是否分布式來決定呼叫哪種排序方式
67 rawkeyvalueiterator riter =islocal
68 ?merger.merge(job, rfs, job.getmapoutputkeyclass(),
69 job.getmapoutputvalueclass(), codec, getmapfiles(rfs, true
),70 !conf.getkeepfailedtaskfiles(), job.getint("io.sort.factor", 100),
71new
path(gettaskid().tostring()), job.getoutputkeycomparator(),
72 reporter, spilledrecordscounter, null)73
: reducecopier.createkviterator(job, rfs, reporter);
7475
//free up the data structures
76mapoutputfilesondisk.clear();
7778 sortphase.complete(); //
sort is complete
79setphase(taskstatus.phase.reduce);
80statusupdate(umbilical);
81//
3.reduce 1.reduce任務的最後乙個階段。它會準備好map的 keyclass("mapred.output.key.class"或"mapred.mapoutput.key.class"),
82//
valueclass("mapred.mapoutput.value.class"或"mapred.output.value.class")
83//
和 comparator (「mapred.output.value.groupfn.class」或 「mapred.output.key.comparator.class」)
84 class keyclass =job.getmapoutputkeyclass();
85 class valueclass =job.getmapoutputvalueclass();
86 rawcomparator comparator =job.getoutputvaluegroupingcomparator();
87//
2.根據引數usenewapi判斷執行runnewreduce還是runoldreduce。分析潤runnewreduce
88if
(usenewapi) else
99done(umbilical, reporter);
100 }
1.reduce過程中三個大的階段比較重要:copy、sort、reduce;
2.codec = initcodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec例項,否則返回null,這裡討論不壓縮的;
4.done(umbilical, reporter)這個方法用於做結束任務的一些清理工作:更新計數器updatecounters();如果任務需要提交,設定taks狀態為commit_pending,並利用taskumbilicalprotocol,匯報task完成,等待提交,然後呼叫commit提交任務;設定任務結束標誌位;結束reporter通訊線程;傳送最後一次統計報告(通過sendlastupdate方法);利用taskumbilicalprotocol報告結束狀態(通過senddone方法)。
hadoop2 作業執行過程之reduce過程
reduce階段就是處理map的輸出資料,大部分過程和map差不多 1 方法開始和maptask類似,包括initialize 初始化,根據情況看是否呼叫runjobcleanuptask 2 runjobsetuptask runtaskcleanuptask 之後進入正式的工作,主要有這麼三個步...
Hadoop2 執行機制簡介
在上圖中resourcemanager支援分層級的應用佇列,這些佇列享有集群一定比例的資源。從某種意義上講它就是乙個純粹的排程器,它在執行過程中不對應用進行監控和狀態跟蹤。同樣,它也不能重啟因應用失敗或者硬體錯誤而執行失敗的任務。resourcemanager 是基於應用程式對資源的需求進行排程的 ...
hadoop2體系結構
hadoop1的核心組成是兩部分,即hdfs和mapreduce。在hadoop2中變為hdfs和yarn。新的hdfs中的namenode不再是只有乙個了,可以有多個 目前只支援2個 每乙個都有相同的職能。這兩個namenode的地位如何哪?答 乙個是active狀態的,乙個是standby狀態的...