這張是官方對shuffle過程的描述。但我可以肯定的是,單從這張圖你基本不可能明白shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。後面我會具體描述shuffle的事實情況,所以這裡你只要清楚shuffle的大致範圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, shuffle描述著資料從map task輸出到reduce task輸入的這段過程。
在hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下reduce執行時需要跨節點去拉取其它節點上的map task結果。如果集群正在執行的job有很多,那麼task的正常執行對集群內部的網路資源消耗會很嚴重。這種網路消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於記憶體,磁碟io對job完成時間的影響也是可觀的。從最基本的要求來說,我們對shuffle過程的期望可以有:
ok,看到這裡時,大家可以先停下來想想,如果是自己來設計這段shuffle過程,那麼你的設計目標是什麼。我想能優化的地方主要在於減少拉取資料的量及盡量使用記憶體而不是磁碟。
我的分析是基於hadoop0.21.0的原始碼,如果與你所認識的shuffle過程有差別,不吝指出。我會以wordcount為例,並假設它有8個map task和3個reduce task。從上圖看出,shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開。
先看看map端的情況,如下圖:
如map 端的細節圖,shuffle在reduce端的過程也能用圖上標明的三點來概括。當前reduce copy資料的前提是它要從jobtracker獲得有哪些map task已執行結束,這段過程不表,有興趣的朋友可以關注下。reducer真正執行之前,所有的時間都是在拉取資料,做merge,且不斷重複地在做。如前面的方式一樣,下面我也分段地描述reduce 端的shuffle細節:
1. copy過程,簡單地拉取資料。reduce程序啟動一些資料copy執行緒(fetcher),通過http方式請求map task所在的tasktracker獲取map task的輸出檔案。因為map task早已結束,這些檔案就歸tasktracker管理在本地磁碟中。
2. merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活,它基於jvm的heap size設定,因為shuffle階段reducer不執行,所以應該把絕大部分的記憶體都給shuffle用。這裡需要強調的是,merge有三種形式:1)記憶體到記憶體 2)記憶體到磁碟 3)磁碟到磁碟。預設情況下第一種形式不啟用,讓人比較困惑,是吧。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的那個檔案。
3. reducer的輸入檔案。不斷地merge後,最後會生成乙個「最終檔案」。為什麼加引號?因為這個檔案可能存在於磁碟上,也可能存在於記憶體中。對我們來說,當然希望它存放於記憶體中,直接作為reducer的輸入,但預設情況下,這個檔案是存放於磁碟中的。至於怎樣才能讓這個檔案出現在記憶體中,之後的
效能優化篇
我再說。當reducer的輸入檔案已定,整個shuffle才最終結束。然後就是reducer執行,把結果放到hdfs上。
Map Reduce的過程解析
map reduce的過程首先是由客戶端提交乙個任務開始的。提交任務主要是通過jobclient.runjob jobconf 靜態函式實現的 public static runningjob runjob jobconf job throws ioexception finally finally...
map reduce 過程的認識
map reduce 過程的認識 最初我一直簡單的以為map 的工作就是將資料打散,而reduce 就是將map 打散後的資料合併。雖然之前跑過wordcount 的例子,但之前只是對輸出reduce 最終的結果感興趣,對控制台列印的日誌資訊完全不懂。這幾天我們團隊在探索pagerank 才開始對m...
Map Reduce過程概述
map reduce的過程首先是由客戶端提交乙個任務開始的。提交任務主要是通過jobclient.runjob jobconf 靜態函式實現的 public static runningjob runjob jobconf job throws ioexception finally finally...