hadoop2 作業執行過程之reduce過程

2022-04-07 00:09:45 字數 3947 閱讀 8053

reduce階段就是處理map的輸出資料,大部分過程和map差不多

1

方法開始和maptask類似,包括initialize()初始化,根據情況看是否呼叫runjobcleanuptask(),2//

runjobsetuptask(),runtaskcleanuptask()。之後進入正式的工作,主要有這麼三個步驟:copy、sort、reduce。

3@override

4 @suppresswarnings("unchecked")

5public

void run(jobconf job, final

taskumbilicalprotocol umbilical)

6throws

ioexception, interruptedexception, classnotfoundexception

15//

start thread that will handle communication with parent

16//

設定並啟動reporter程序以便和tasktracker進行交流

17 taskreporter reporter = new

taskreporter(getprogress(), umbilical,

18jvmcontext);

19reporter.startcommunicationthread();

20//

在job client中初始化job時,預設就是用新的api,詳見job.setusenewapi()方法

21boolean usenewapi =job.getusenewreducer();

22/*

用來初始化任務,主要是進行一些和任務輸出相關的設定,比如建立commiter,設定工作目錄等

*/23 initialize(job, getjobid(), reporter, usenewapi);//

這裡將會處理輸出目錄

24/*

以下4個if語句均是根據任務型別的不同進行相應的操作,這些方 法均是task類的方法,所以與任務是maptask還是reducetask無關

*/25

//check if it is a cleanupjobtask

26if

(jobcleanup)

30if

(jobsetup)

35if

(taskcleanup)

4041

//initialize the codec

42 codec =initcodec();

4344

boolean islocal = "local".equals(job.get("mapred.job.tracker", "local"));  //

判斷是否是單機hadoop

45if (!islocal)

53throw

new ioexception("task: " + gettaskid() +

54 " - the reduce copier failed", reducecopier.mergethrowable);55}

56}57 copyphase.complete(); //

copy is already complete

58setphase(taskstatus.phase.sort);

59statusupdate(umbilical);

6061

final filesystem rfs =filesystem.getlocal(job).getraw();

62//

2.sort(其實相當於合併).排序工作,就相當於上述排序工作的乙個延續。它會在所有的檔案都拷貝完畢後進行。

63//

使用工具類merger歸併所有的檔案。經過這乙個流程,乙個合併了所有所需map任務輸出檔案的新檔案產生了。

64//

而那些從其他各個伺服器網羅過來的 map任務輸出檔案,全部刪除了。

6566

//根據hadoop是否分布式來決定呼叫哪種排序方式

67 rawkeyvalueiterator riter =islocal

68 ?merger.merge(job, rfs, job.getmapoutputkeyclass(),

69 job.getmapoutputvalueclass(), codec, getmapfiles(rfs, true

),70 !conf.getkeepfailedtaskfiles(), job.getint("io.sort.factor", 100),

71new

path(gettaskid().tostring()), job.getoutputkeycomparator(),

72 reporter, spilledrecordscounter, null)73

: reducecopier.createkviterator(job, rfs, reporter);

7475

//free up the data structures

76mapoutputfilesondisk.clear();

7778 sortphase.complete(); //

sort is complete

79setphase(taskstatus.phase.reduce);

80statusupdate(umbilical);

81//

3.reduce 1.reduce任務的最後乙個階段。它會準備好map的 keyclass("mapred.output.key.class"或"mapred.mapoutput.key.class"),

82//

valueclass("mapred.mapoutput.value.class"或"mapred.output.value.class")

83//

和 comparator (「mapred.output.value.groupfn.class」或 「mapred.output.key.comparator.class」)

84 class keyclass =job.getmapoutputkeyclass();

85 class valueclass =job.getmapoutputvalueclass();

86 rawcomparator comparator =job.getoutputvaluegroupingcomparator();

87//

2.根據引數usenewapi判斷執行runnewreduce還是runoldreduce。分析潤runnewreduce

88if

(usenewapi) else

99done(umbilical, reporter);

100 }

1.reduce過程中三個大的階段比較重要:copy、sort、reduce;

2.codec = initcodec()這句是檢查map的輸出是否是壓縮的,壓縮的則返回壓縮codec例項,否則返回null,這裡討論不壓縮的;

4.done(umbilical, reporter)這個方法用於做結束任務的一些清理工作:更新計數器updatecounters();如果任務需要提交,設定taks狀態為commit_pending,並利用taskumbilicalprotocol,匯報task完成,等待提交,然後呼叫commit提交任務;設定任務結束標誌位;結束reporter通訊線程;傳送最後一次統計報告(通過sendlastupdate方法);利用taskumbilicalprotocol報告結束狀態(通過senddone方法)。

hadoop2 作業執行過程之reduce過程

reduce階段就是處理map的輸出資料,大部分過程和map差不多 1 方法開始和maptask類似,包括initialize 初始化,根據情況看是否呼叫runjobcleanuptask 2 runjobsetuptask runtaskcleanuptask 之後進入正式的工作,主要有這麼三個步...

Hadoop2 執行機制簡介

在上圖中resourcemanager支援分層級的應用佇列,這些佇列享有集群一定比例的資源。從某種意義上講它就是乙個純粹的排程器,它在執行過程中不對應用進行監控和狀態跟蹤。同樣,它也不能重啟因應用失敗或者硬體錯誤而執行失敗的任務。resourcemanager 是基於應用程式對資源的需求進行排程的 ...

hadoop2體系結構

hadoop1的核心組成是兩部分,即hdfs和mapreduce。在hadoop2中變為hdfs和yarn。新的hdfs中的namenode不再是只有乙個了,可以有多個 目前只支援2個 每乙個都有相同的職能。這兩個namenode的地位如何哪?答 乙個是active狀態的,乙個是standby狀態的...