mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。
shuffle階段是從map方法輸出資料以後開始到reduce方法輸入資料之前結束。
分割槽的數量 = reducetask數量 = 結果檔案的數量
首先是由map方法處理後的key/value對輸入到環形緩衝區。
當環形緩衝區寫滿之後將會對緩衝區裡面的資料進行分割槽、排序操作,然後溢寫到磁碟中(也可以先使用combiner進行合併處理),可能環形緩衝區會進行多次溢寫。
將多次溢寫的資料按分割槽進行歸併排序,合併為乙個大的檔案,然後將這個大檔案通過壓縮手段進行壓縮(減小磁碟耗費量,減少網路io傳輸),最後寫入到磁碟中。
在reduce端,每乙個reduce按照分割槽號將每乙個map輸出的資料中的對應分割槽的資料拷貝到自己的緩衝區中(比如:reducetask1是處理1號分割槽的資料,則它就將所有map輸出的1號緩衝區的資料拷貝到自己的緩衝區中),若緩衝區不夠則將資料溢寫到磁碟。
然後對每乙個map來的資料進行歸併排序。
最後按照相同的key分組輸入到reduce方法中。
預設分割槽是根據key的hashcode對reducetasks個數取模得到的。使用者沒法控制哪個key儲存到哪個分割槽。
預設分割槽:
public class hashpartitionerextends partitioner }
但是使用者可以自定義分割槽,比如可以自定義按手機號進行分割槽(136開頭的0號分割槽,137開頭的1號分割槽。。。。。),
自定義partitioner步驟:
(1)自定義類繼承partitioner,重寫getpartition()方法
public class provincepartitioner extends partitionerelse if ("137".equals(prenum)) else if ("138".equals(prenum)) else if ("139".equals(prenum))
return partition;
}}
job.setpartitionerclass(custompartitioner.class);(2)在job驅動中,設定自定義partitioner:
(3)自定義partition後,要根據自定義partitioner的邏輯設定相應數量的reduce task
job.setnumreducetasks(5);
3)注意:
如果reducetask的數量》 getpartition的結果數,則會多產生幾個空的輸出檔案part-r-000xx;
如果1如果reducetask的數量=1,則不管maptask端輸出多少個分割槽檔案,最終結果都交給這乙個reducetask,最終也就只會產生乙個結果檔案 part-r-00000; 例如
:假設自定義分割槽數為5,則
(1)job.setnumreducetasks(1);會正常執行,只不過會產生乙個輸出檔案
(2)job.setnumreducetasks(2);會報錯
(3)job.setnumreducetasks(6);大於5,程式會正常執行,會產生空檔案
在maptask讀取壓縮檔案前可以進行先解壓:
資料解壓:createinputstream(inputstream in)建立乙個compressioninputstream,讀取壓縮資料。
也可以在reduce處理完成之後,再將資料以壓縮的格式進行寫出到檔案。
優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。
缺點:壓縮率比gzip要低一些;hadoop本身不支援,需要安裝;在應用中對lzo格式的檔案需要做一些特殊處理(為了支援split需要建索引,還需要指定inputformat為lzo格式)。
應用場景:乙個很大的文字檔案,壓縮之後還大於200m以上的可以考慮,而且單個檔案越大,lzo優點越越明顯。
優點:高速壓縮速度和合理的壓縮率。
缺點:不支援split;壓縮率比gzip要低;hadoop本身不支援,需要安裝;
應用場景:當
mapreduce作業的
map輸出的資料比較大的時候,作為
map到
reduce的中間資料的壓縮格式;或者作為乙個
mapreduce作業的輸出和另外乙個
mapreduce作業的輸入。
壓縮可以在mapreduce作用的任意階段啟用。
示例:可以在driver類中開啟並指定使用哪種壓縮方式
1、map端輸出開啟並設定壓縮方式:
// 開啟map端輸出壓縮
configuration.setboolean("mapreduce.map.output.compress",true);
// 設定map端輸出壓縮方式
configuration.setclass("mapreduce.map.output.compress.codec", bzip2codec.class
, compressioncodec.class
);/2、
2、reduce端輸出開啟並設定壓縮方式:
// 設定reduce端輸出壓縮開啟
fileoutputformat.
setcompressoutput
(job,true);
// 設定壓縮的方式
fileoutputformat.
setoutputcompressorclass
(job, bzip2codec.class
);
Spark之Shuffle機制和原理
shuffle就是對資料進行重組,由於分布式計算的特性和要求,在實現細節上更加繁瑣和複雜 在mapreduce框架,shuffle是連線map和reduce之間的橋梁,map階段通過shuffle讀取資料並輸出到對應的reduce 而reduce階段負責從map端拉取資料並進行計算。在整個shuff...
shuffle流程理解
什麼是shuffle流程?mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序,將map輸出作為輸入傳給reducer的過程稱為shuffle.簡單來說就是 從map輸出開始,到reduce輸入之前這個階段。如下圖所示 每個map任務的結果輸出到緩衝區中 預設大小為100m的環...
Shuffle機制及優化
map方法之後,reduce方法之前的資料處理過程稱之為shuffle。具體shuffle過程詳解 1 maptask收集我們的map 方法輸出的kv對,放到記憶體緩衝區中 2 從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案 3 多個溢位檔案會被合併成大的溢位檔案 4 在溢位過程及合併的過程...