這個階段要完成以下工作:
publicinte***ce inputsplit extends
writable
我們看到inputsplit中記錄了原始資料的長度length,而location則有多個(是乙個陣列)。location只記錄了主機名,它用於在指派map task的時候,讓map task離它處理的split近一些。而記錄length的作用是讓最大的split先被處理,這是hadoop為了最小化作業執行時間而採取的貪心策略。
jobtracker根據input split從hdfs中獲取每個split代表的真實資料,對於每個split都要生成乙個map task來處理它。至於生成多少reduce task則由屬性mapred.reduce.tasks來決定。每個task都獲得乙個id。乙個map task和它要處理的split在集群中盡可能地鄰近,最好是在同乙個tasktracker上。
只要task tracker一啟動它就周期性地向job tracker傳送心跳,表示自己還活著,心跳中還包含其他資訊,告訴job tracer自己的忙閒狀態,如果是閒,那job tracker就可以給它分配新任務了。
每個tasktracker上都有固定數目(這取決於該節點擁有的cpu和記憶體)的slots來安放map task和reduce task。比如有的tasktracker上可以同時執行2個map task和2個reduce task。
首先每個task tracker要從共享的filesystem中拷貝兩樣東西到本地:包含程式**的jar檔案、distributedcachefile。然後為本次task建立乙個工作目錄。最後建立乙個jvm執行作業。也就是說在task tracker上可以同時執行多個jvm,每個jvm執行乙個task(map tack或reduce task) ,所以在map或reduce中發生的bug只會影響本jvm(使之中止或崩潰),但不會影響到task tracker。jvm在不同的task之間是可以重用的,但是乙個jvm在同一時刻只能執行乙個task。
每個task都要及時地向它所在的tasktracker匯報進度和狀態,tasktracker及時地向jobtracker匯報執行進度和狀態(比如執行了多少個task,哪個失效了等等),這些資訊最終由jobtracker反饋給使用者。
解釋一下「進度」,對於map task來說,就是已處理的資料和input split總長度的比值;但是對於reduce task來說就有些特別,事實上reduce task包括3個階段:copy、sort和reduce,每完成乙個階段進度就前進了33%。
jobtracker清理一些狀態資料和臨時的輸出檔案,並通知tasktacker做同樣的清理工作。
對於懸掛的task,如果task tracker在一段時間(預設是10min,可以通過mapred.task.timeout屬性值來設定,單位是毫秒)內一直沒有收到它的進度報告,則把它標記為失效。tasktracker通過心跳包告知jobtracker某個task attempt失敗了,則jobtracker把該task盡量分配給另外乙個task tracker來執行。如果同乙個task連續4次(該值可以通過mapred.map.max.attempts和mapred.reduce.max.attempts屬性值來設定)都執行失敗,那jobtracker就不會再做更多的嘗試了,本次job也就宣告失敗了。
對於某些應用可能不想因為個別task的失敗而導致整個job的失敗,你可以設定失效的task小於一定比例時job仍然是成功的,通過這兩個屬性:mapred.max.map.failures.percent和mapred.max.reduce.failures.percent。
上面說了task失效,還有一種情況是tasktracker失效。如果tasktracker執行很慢或直接crash了,則它停止向jobtracker傳送心跳。若jobtracker在10min(這個值可以通過mapred.task.tracker.expiry.interval屬性進行設定)沒有收到tasktracker的心跳就把它從排程池中移除。由於整個tasktracker節點都失效,它上面的已經執行完畢的map task的輸出也不能再被reduce task獲取,所以之前分配給該tasktracker的所有map task(不管是執行完畢還是沒有完畢)都要放到另乙個節點上重新執行,已經執行完畢的reduce task由於結果已經輸出到了最終檔案裡面,就不需要重新執行了。即使tasktracker沒有失效,當它上面失效的task太多時同樣會被列入黑名單。
如果失效的是jobtracker那就無藥可救了,因為jobtracker只有乙個,沒有誰可以替代它。
預設情況下採用fifo原則,先提交的作業先被排程。其實job也可以設定優先順序,通過mapred.job.priority屬性或呼叫setjobpriority()方法。
「公平排程器」讓每個job獲得相同的資源,所以小作業會比大作業完成得早。
在map側
每個map task都有乙個迴圈利用的緩衝區(預設大小是100m,通過io.sort.mb設定),它有輸出先放到緩衝區裡。當緩衝區滿80%(該值可以通過io.sort.spill.percent設定)後,後台執行緒會把它spill到磁碟。當緩衝區填滿100%時,map的輸出沒地方做,map task將被阻塞。
在被寫入磁碟之前,資料首先會被分割槽(根據hadoop的partitioner),在每個分割槽中資料會被放到記憶體中進行排序。maptask結束的時候所有的分割槽會被合併成乙個排序好的檔案。
在reduce側
至於reduce階段自然就是執行reduce()函式嘍.
由於shuffle要占用大量記憶體,所以我們設計的map和reduce要盡量地小,占用少量的記憶體為好。
乙個task執行得慢可能出於多種原因,包括硬體老化。hadoop的策略是當發現task執行得慢時就在乙個新的jvm甚至是新的節點上啟動它的乙個「備份」。
當所有的task都已經啟動並執行了一段時間後,如果發現某些task比其他的執行的都慢,這時就會啟動乙個speculatie task跟原task執行相同的任務--只要其中有乙個先完成,那另乙個就可以kill掉了。
當然採用speculatie task就會產生雙份的輸出,要處理好這個問題。
對於海量的資料檔案,存在壞記錄(格式不對、資料缺失等)是很正常的,在程式中必須妥善處理,否則可能會因為極個別的「壞記錄」中斷了整個job。另外你也可以選擇skip bad records。skipping模式預設情況下是關閉的,你可以使用skipbadrecord類分別為map和reduce開啟skipping模式。但是skipping模式預設情況下對於每個task只能跳過1壞記錄而不妨礙task的成功結束,你可以通過設定mapred.map.max.attempts和mapred.reduce.max.attempts來把容忍值調得大一些。
hadoop可以為task提供執行時的環境資訊:
mapred.job.id jobid
mapred.tip.id taskid
mapred.task.id task attempt id,注意的task id區別
mapred.task.partition task id在job中的序號
mapred.task.is.map 判斷task是否為map task
由於task可能會失效,這樣乙個task就會對應多個task attempt,就會有多次輸出。這了解決這個問題,每次task attempt的輸出目錄為$/_temporary/$,當任務成功結束後再把它複製到$中。那乙個task如何知道自己的工作目錄呢?可以從配置檔案中檢索mapred.word.output.dir的屬性值,也可呼叫fileoutputformat的靜態方法getworkoutputpath()。
MapReduce工作流程
1.流程示意圖 mapreduce詳細工作流程 一 mapreduce詳細工作流程 二 流程詳解 上面是整個mapreduce最全工作流程,但是shuffle過程知識從第7步開始到第16步結束,具體shuffle過程詳解 1 maptask收集我們的map 方法輸出的kv對,放到記憶體緩衝區中 2 ...
map reduce的工作流程
mapreduce工作流程 wordcount 3.map shuffle 對map結果的key根據reducer的個數進行hash寫入緩衝區 key,value,partition 當緩衝區的大小占用了80 左右,將緩衝區的資料寫入磁碟,並根據partition key進行排序,生成乙個 多個溢寫...
整理 map reduce工作流程
shuffle階段 shuffle是指從map 產生輸出開始,包括系統執行排序以及傳送map 輸出到reducer 作為輸入的過程。首先從map 端開始分析。當map 開始產生輸出時,它並不是簡單的把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更複雜,資料首先是寫到記憶體中的乙個...