mapreduce 是現今乙個非常流行的分布式計算框架,它被設計用於平行計算海量資料。第乙個提出該技術框架的是google 公司,而google 的靈感則來自於函式式程式語言,如lisp,scheme,ml 等。
mapreduce 框架的核心步驟主要分兩部分:map 和reduce。當你向mapreduce 框架提交乙個計算作業時,它會首先把計算作業拆分成若干個map 任務,然後分配到不同的節點上去執行,每乙個map 任務處理輸入資料中的一部分,當map 任務完成後,它會生成一些中間檔案,這些中間檔案將會作為reduce 任務的輸入資料。reduce 任務的主要目標就是把前面若干個map 的輸出彙總到一起並輸出。
本文的重點是剖析mapreduce 的核心過程——shuffle和sort。在本文中,shuffle是指從map 產生輸出開始,包括系統執行排序以及傳送map 輸出到reducer 作為輸入的過程。在這裡我們將去**shuffle是如何工作的,因為對基礎的理解有助於對mapreduce 程式進行調優。
首先從map 端開始分析。當map 開始產生輸出時,它並不是簡單的把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更複雜,資料首先是寫到記憶體中的乙個緩衝區,並做了一些預排序,以提公升效率。
每個map 任務都有乙個用來寫入輸出資料的迴圈記憶體緩衝區。這個緩衝區預設大小是100mb,可以通過io.sort.mb 屬性來設定具體大小。當緩衝區中的資料量達到乙個特定閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 預設是0.80)時,系統將會啟動乙個後台執行緒把緩衝區中的內容spill 到磁碟。在spill 過程中,map 的輸出將會繼續寫入到緩衝區,但如果緩衝區已滿,map 就會被阻塞直到spill 完成。spill 執行緒在把緩衝區的資料寫到磁碟前,會對它進行乙個二次快速排序,首先根據資料所屬的partition 排序,然後每個partition 中再按key 排序。輸出包括乙個索引檔案和資料檔案。如果設定了combiner,將在排序輸出的基礎上執行。combiner 就是乙個mini reducer,它在執行map 任務的節點本身執行,先對map 的輸出做一次簡單reduce,使得map 的輸出更緊湊,更少的資料會被寫入磁碟和傳送到reducer。spill 檔案儲存在由mapred.local.dir指定的目錄中,map 任務結束後刪除。
每當記憶體中的資料達到spill 閥值的時候,都會產生乙個新的spill 檔案,所以在map任務寫完它的最後乙個輸出記錄時,可能會有多個spill 檔案。在map 任務完成前,所有的spill 檔案將會被歸併排序為乙個索引檔案和資料檔案,如圖3 所示。這是乙個多路歸併過程,最大歸併路數由io.sort.factor 控制(預設是10)。如果設定了combiner,並且spill檔案的數量至少是3(由min.num.spills.for.combine 屬性控制),那麼combiner 將在輸出檔案被寫入磁碟前執行以壓縮資料。
對寫入到磁碟的資料進行壓縮(這種壓縮同combiner 的壓縮不一樣)通常是乙個很好的方法,因為這樣做使得資料寫入磁碟的速度更快,節省磁碟空間,並減少需要傳送到reducer 的資料量。預設輸出是不被壓縮的, 但可以很簡單的設定mapred.compress.map.output 為true 啟用該功能。壓縮所使用的庫由mapred.map.output.compression.codec 來設定,
目前主要有以下幾個壓縮格式:
deflate 無deflate .deflate 不支援不可以
gzip gzip deflate .gz 不支援不可以
zip zip deflate .zip 支援可以
bzip2 bzip2 bzip2 .bz2 不支援可以
lzo lzop lzo .lzo 不支援不可以
bbs.hadoopor.com --------hadoop 技術論壇
當spill 檔案歸併完畢後,map 將刪除所有的臨時spill 檔案,並告知tasktracker 任務已完成。reducers 通過http 來獲取對應的資料。用來傳輸partitions 資料的工作執行緒數由tasktracker.http.threads 控制,這個設定是針對每乙個tasktracker 的,並不是單個map,預設值為40,在執行大作業的大集群上可以增大以提公升資料傳輸速率。
現在讓我們轉到shuffle的reduce 部分。map 的輸出檔案放置在執行map 任務的tasktracker 的本地磁碟上(注意:map 輸出總是寫到本地磁碟,但reduce 輸出不是,一般是寫到hdfs),它是執行reduce 任務的tasktracker 所需要的輸入資料。reduce 任務的輸入資料分布在集群內的多個map 任務的輸出中,map 任務可能會在不同的時間內完成,只要有其中的乙個map 任務完成,reduce 任務就開始拷貝它的輸出。這個階段稱之為拷貝階段。reduce 任務擁有多個拷貝執行緒, 可以並行的獲取map 輸出。可以通過設定mapred.reduce.parallel.copies 來改變執行緒數,預設是5。
如果map 輸出足夠小,它們會被拷貝到reduce tasktracker 的記憶體中(緩衝區的大小
由mapred.job.shuffle.input.buffer.percent 控制,制定了用於此目的的堆記憶體的百分比);如果緩衝區空間不足,會被拷貝到磁碟上。當記憶體中的緩衝區用量達到一定比例閥值(由mapred.job.shuffle.merge.threshold 控制),或者達到了map 輸出的閥值大小(由mapred.inmem.merge.threshold 控制),緩衝區中的資料將會被歸併然後spill 到磁碟。
拷貝來的資料疊加在磁碟上,有乙個後台執行緒會將它們歸併為更大的排序檔案,這樣做節省了後期歸併的時間。對於經過壓縮的map 輸出,系統會自動把它們解壓到記憶體方便對其執行歸併。
當所有的map 輸出都被拷貝後,reduce 任務進入排序階段(更恰當的說應該是歸併階段,因為排序在map 端就已經完成),這個階段會對所有的map 輸出進行歸併排序,這個工作會重複多次才能完成。
假設這裡有50 個map 輸出(可能有儲存在記憶體中的),並且歸併因子是10(由io.sort.factor 控制,就像map 端的merge 一樣),那最終需要5 次歸併。每次歸併會把10個檔案歸併為乙個,最終生成5 個中間檔案。在這一步之後,系統不再把5 個中間檔案歸併壓縮格式工具演算法副檔名支援分卷是否可分割成乙個,而是排序後直接「喂」給reduce 函式,省去向磁碟寫資料這一步。最終歸併的資料可以是混合資料,既有記憶體上的也有磁碟上的。由於歸併的目的是歸併最少的檔案數目,使得在最後一次歸併時總檔案個數達到歸併因子的數目,所以每次操作所涉及的檔案個數在實際中會更微妙些。譬如,如果有40 個檔案,並不是每次都歸併10 個最終得到4 個檔案,相反第一次只歸併4 個檔案,然後再實現三次歸併,每次10 個,最終得到4 個歸併好的檔案和6 個未歸併的檔案。要注意,這種做法並沒有改變歸併的次數,只是最小化寫入磁碟的資料優化措施,因為最後一次歸併的資料總是直接送到reduce 函式那裡。
在reduce 階段,reduce 函式會作用在排序輸出的每乙個key 上。這個階段的輸出被直接寫到輸出檔案系統,一般是hdfs。在hdfs 中,因為tasktracker 節點也執行著乙個datanode 程序,所以第乙個塊備份會直接寫到本地磁碟。
到此,mapreduce 的shuffle和sort分析完畢。
MapReduce中的排序
hadoop的計算模型就是map reduce,每乙個計算任務會被分割成很多互不依賴的map reduce計算單元,將所有的計算單元執行完畢後整個計算任務就完成了。因為計算單元之間互不依賴所以計算單元可以分配到不同的計算機上執行,這樣就可以將計算壓力平攤到多個機器上面。當然效能線性提高是有條件的,前...
MapReduce中的shuffle機制
shuffle機制是mapreduce整個處理過程中的核心機制,涉及到了分組 排序 資料快取以及中間結果傳遞 map結果怎麼交付給reduce 其整個過程可以用一張圖表示。當沒有自定義分組時,預設所有的key在乙個分組中。如果有自定義分組,則按照自定義的分組邏輯進行分組,對應圖中的partition...
mapReduce中的shuffle過程
從map 的輸出到reduce 的輸入,中間的過程被稱為shuffle過程。map side 1.在寫入磁碟之前,會先寫入環形緩衝區 circular memory buffer 預設100m mapreduce.task.io.sort.mb可修改 當緩衝區內容達到80m mapre duce.m...