Shuffle機制流程原理

2021-08-31 18:39:06 字數 3084 閱讀 3324

mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(即將map輸出作為輸入傳給reducer)稱為shuffle。

shuffle階段是從map方法輸出資料以後開始到reduce方法輸入資料之前結束。

分割槽的數量 =  reducetask數量  = 結果檔案的數量

首先是由map方法處理後的key/value對輸入到環形緩衝區。

當環形緩衝區寫滿之後將會對緩衝區裡面的資料進行分割槽、排序操作,然後溢寫到磁碟中(也可以先使用combiner進行合併處理),可能環形緩衝區會進行多次溢寫。

將多次溢寫的資料按分割槽進行歸併排序,合併為乙個大的檔案,然後將這個大檔案通過壓縮手段進行壓縮(減小磁碟耗費量,減少網路io傳輸),最後寫入到磁碟中。

在reduce端,每乙個reduce按照分割槽號將每乙個map輸出的資料中的對應分割槽的資料拷貝到自己的緩衝區中(比如:reducetask1是處理1號分割槽的資料,則它就將所有map輸出的1號緩衝區的資料拷貝到自己的緩衝區中),若緩衝區不夠則將資料溢寫到磁碟。

然後對每乙個map來的資料進行歸併排序。

最後按照相同的key分組輸入到reduce方法中。

預設分割槽是根據key的hashcode對reducetasks個數取模得到的。使用者沒法控制哪個key儲存到哪個分割槽。

預設分割槽:

public class hashpartitionerextends partitioner }

但是使用者可以自定義分割槽,比如可以自定義按手機號進行分割槽(136開頭的0號分割槽,137開頭的1號分割槽。。。。。),

自定義partitioner步驟:

(1)自定義類繼承partitioner,重寫getpartition()方法

public class provincepartitioner extends partitionerelse if ("137".equals(prenum)) else if ("138".equals(prenum)) else if ("139".equals(prenum)) 

return partition;

}}

job.setpartitionerclass(custompartitioner.class);(2)在job驅動中,設定自定義partitioner:

(3)自定義partition後,要根據自定義partitioner的邏輯設定相應數量的reduce task

job.setnumreducetasks(5);

3)注意:

如果reducetask的數量》 getpartition的結果數,則會多產生幾個空的輸出檔案part-r-000xx;

如果1如果reducetask的數量=1,則不管maptask端輸出多少個分割槽檔案,最終結果都交給這乙個reducetask,最終也就只會產生乙個結果檔案 part-r-00000; 例如

:假設自定義分割槽數為5,則

(1)job.setnumreducetasks(1);會正常執行,只不過會產生乙個輸出檔案

(2)job.setnumreducetasks(2);會報錯

(3)job.setnumreducetasks(6);大於5,程式會正常執行,會產生空檔案

在maptask讀取壓縮檔案前可以進行先解壓:

資料解壓:createinputstream(inputstream in)建立乙個compressioninputstream,讀取壓縮資料。

也可以在reduce處理完成之後,再將資料以壓縮的格式進行寫出到檔案。

優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。

缺點:壓縮率比gzip要低一些;hadoop本身不支援,需要安裝;在應用中對lzo格式的檔案需要做一些特殊處理(為了支援split需要建索引,還需要指定inputformat為lzo格式)。

應用場景:乙個很大的文字檔案,壓縮之後還大於200m以上的可以考慮,而且單個檔案越大,lzo優點越越明顯。

優點:高速壓縮速度和合理的壓縮率。

缺點:不支援split;壓縮率比gzip要低;hadoop本身不支援,需要安裝;

應用場景:當

mapreduce作業的

map輸出的資料比較大的時候,作為

map到

reduce的中間資料的壓縮格式;或者作為乙個

mapreduce作業的輸出和另外乙個

mapreduce作業的輸入。

壓縮可以在mapreduce作用的任意階段啟用。

示例:可以在driver類中開啟並指定使用哪種壓縮方式

1、map端輸出開啟並設定壓縮方式:

// 開啟map端輸出壓縮

configuration.setboolean("mapreduce.map.output.compress",true);

// 設定map端輸出壓縮方式

configuration.setclass("mapreduce.map.output.compress.codec", bzip2codec.class

, compressioncodec.class

);/2、

2、reduce端輸出開啟並設定壓縮方式:

// 設定reduce端輸出壓縮開啟

fileoutputformat.

setcompressoutput

(job,true);

// 設定壓縮的方式

fileoutputformat.

setoutputcompressorclass

(job, bzip2codec.class

);

Spark之Shuffle機制和原理

shuffle就是對資料進行重組,由於分布式計算的特性和要求,在實現細節上更加繁瑣和複雜 在mapreduce框架,shuffle是連線map和reduce之間的橋梁,map階段通過shuffle讀取資料並輸出到對應的reduce 而reduce階段負責從map端拉取資料並進行計算。在整個shuff...

shuffle流程理解

什麼是shuffle流程?mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序,將map輸出作為輸入傳給reducer的過程稱為shuffle.簡單來說就是 從map輸出開始,到reduce輸入之前這個階段。如下圖所示 每個map任務的結果輸出到緩衝區中 預設大小為100m的環...

Shuffle機制及優化

map方法之後,reduce方法之前的資料處理過程稱之為shuffle。具體shuffle過程詳解 1 maptask收集我們的map 方法輸出的kv對,放到記憶體緩衝區中 2 從記憶體緩衝區不斷溢位本地磁碟檔案,可能會溢位多個檔案 3 多個溢位檔案會被合併成大的溢位檔案 4 在溢位過程及合併的過程...