這張是官方對shuffle過程的描述。但我可以肯定的是,單從這張圖你基本不可能明白shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。後面我會具體描述shuffle的事實情況,所以這裡你只要清楚shuffle的大致範圍就成-怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, shuffle描述著資料從map task輸出到reduce task輸入的這段過程。
在hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。當然很多情況下reduce執行時需要跨節點去拉取其它節點上的map task結果。如果集群正在執行的job有很多,那麼task的正常執行對集群內部的網路資源消耗會很嚴重。這種網路消耗是正常的,我們不能限制,能做的就是最大化地減少不必要的消耗。還有在節點內,相比於記憶體,磁碟io對job完成時間的影響也是可觀的。從最基本的要求來說,我們對shuffle過程的期望可以有:
ok,看到這裡時,大家可以先停下來想想,如果是自己來設計這段shuffle過程,那麼你的設計目標是什麼。我想能優化的地方主要在於減少拉取資料的量及盡量使用記憶體而不是磁碟。
我的分析是基於hadoop0.21.0的原始碼,如果與你所認識的shuffle過程有差別,不吝指出。我會以wordcount為例,並假設它有8個map task和3個reduce task。從上圖看出,shuffle過程橫跨map與reduce兩端,所以下面我也會分兩部分來展開。
先看看map端的情況,如下圖:
如map 端的細節圖,shuffle在reduce端的過程也能用圖上標明的三點來概括。當前reduce copy資料的前提是它要從jobtracker獲得有哪些map task已執行結束,這段過程不表,有興趣的朋友可以關注下。reducer真正執行之前,所有的時間都是在拉取資料,做merge,且不斷重複地在做。如前面的方式一樣,下面我也分段地描述reduce 端的shuffle細節:
1. copy過程,簡單地拉取資料。reduce程序啟動一些資料copy執行緒(fetcher),通過http方式請求map task所在的tasktracker獲取map task的輸出檔案。因為map task早已結束,這些檔案就歸tasktracker管理在本地磁碟中。
2. merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活,它基於jvm的heap size設定,因為shuffle階段reducer不執行,所以應該把絕大部分的記憶體都給shuffle用。這裡需要強調的是,merge有三種形式:1)記憶體到記憶體 2)記憶體到磁碟 3)磁碟到磁碟。預設情況下第一種形式不啟用,讓人比較困惑,是吧。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的那個檔案。
3. reducer的輸入檔案。不斷地merge後,最後會生成乙個「最終檔案」。為什麼加引號?因為這個檔案可能存在於磁碟上,也可能存在於記憶體中。對我們來說,當然希望它存放於記憶體中,直接作為reducer的輸入,但預設情況下,這個檔案是存放於磁碟中的。至於怎樣才能讓這個檔案出現在記憶體中,之後的效能優化篇我再說。當reducer的輸入檔案已定,整個shuffle才最終結束。然後就是reducer執行,把結果放到hdfs上。
shuf處理文字
在cu上面看到了乙個帖子,帖子的內容即要求是 請教一下,我需要頻繁不斷地聯接9臺伺服器執行某個相同的服務。但我有特殊的要求 1。每次都按不同的順序來訪問這9臺伺服器。例如 135987642,下次又是亂序依次訪問。2。希望這9臺伺服器的主機名通過乙個shell 指令碼整合不需要另外起乙個txt文件來...
MapReduce過程詳解
1.輸入分片 input split 在進行map計算之前,mapreduce會根據輸入檔案計算輸入分片 input split 每個輸入分片 input split 針對乙個map任務。2.map階段 就是我們寫的map函式,map函式效率相對好控制,而且一般map操作都是本地化操作也就是在資料儲...
詳解MapReduce過程
textinputformat的原始碼注釋為 檢視inputformat介面的原始碼注釋我們了解到這個介面的作用為 在inputformat的源 中有如下兩個方法 inputsplit getsplits jobconf job,int numsplits throws ioexception 獲取...