shuffle就是對資料進行重組,由於分布式計算的特性和要求,在實現細節上更加繁瑣和複雜
在mapreduce框架,shuffle是連線map和reduce之間的橋梁,map階段通過shuffle讀取資料並輸出到對應的reduce;而reduce階段負責從map端拉取資料並進行計算。在整個shuffle過程中,往往伴隨著大量的磁碟和網路i/o。所以shuffle效能的高低也直接決定了整個程式的效能高低。spark也會有自己的shuffle實現過程
什麼是hashshuffle?
每乙個task的計算結果根據key的hashcode與reduce task的個數取模決定寫入到哪乙個分割槽檔案,這樣就能保證相同的資料一定是落入到某乙個分割槽檔案中。
shuffle可能面臨的問題?
磁碟小檔案的個數= map task num × reduce task num
磁碟小檔案過多帶來什麼問題?
優化後的hashshufflemanager
使用hashshuffle並且開啟合併機制,shuffle過程中磁碟小檔案個數為 cores × reduce task num
該機制每乙個maptask不會為後續的任務建立單獨的檔案,而是會將所有的task結果寫入同乙個檔案,並且對應生成乙個索引檔案。以前的資料是放在記憶體快取中,等到資料完了再刷到磁碟,現在為了減少記憶體的使用,在記憶體不夠用的時候,可以將輸出溢寫到磁碟,結束的時候,再將這些不同的檔案聯合記憶體的資料一起進行歸併,從而減少記憶體的使用量。
sortshuffle的執行機制主要分成兩種:
sortshufflemanager普通執行機制
比較適合資料量很大的場景或者集群規模很大
sortshufflemanager bypass執行機制
主要用於處理reducer任務數量比較少或不需要排序和聚合的shuffle操作,資料是直接寫入檔案,資料量較大的時候,網路i/o和記憶體負擔較重。
bypass執行機制的觸發條件如下:
shuffle reduce task數量小於spark.shuffle.sort.bypassmergethreshold引數的值。
spark之shuffle引數優化
spark.shuffle.file.buffer預設32k shuffle write task端的緩衝區,到達閾值後,溢寫到磁碟。將數值調大,減少io操作,提公升整體效能 具體數值根據實際情況設定 spark.reducer.maxsizeinflight預設48m reduce shuffle...
Spark 之 shuffle 相關的運算元
目錄 1 repartition 類 repartition類的操作 比如repartition repartitionandsortwithinpartitions coalesce等。重分割槽 一般會shuffle,因為需要在整個集群中,對之前所有的分割槽的資料進行隨機 均勻地打亂,然後把資料放...
Hadoop之Shuffle機制詳解
一般把資料從map階段輸出到reduce階段的過程叫shuffle,所以shuffle的作用範圍是map階段資料輸出到reduce階段資料輸入這一整個中間過程!1 collect階段 將maptask的結果輸出到預設大小為100m的環形緩衝區,儲存的是key value序列化資料,partition...