spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。
spark中需要shuffle輸出的map任務會為每個reduce建立對應的bucket,map產生的結果會根據設定的partitioner得到對應的bucketid,然後填充到相應的bucket中去。每個map的輸出結果可能包含所有的reduce所需要的資料,所以每個map會建立r個bucket(r是reduce的個數),m個map總共會建立m*r個bucket。
map建立的bucket其實對應磁碟上的乙個檔案,map的結果寫到每個bucket中其實就是寫到那個磁碟檔案中,這個檔案也被稱為blockfile,是disk block manager管理器通過檔名的hash值對應到本地目錄的子目錄中建立的。每個map要在節點上建立r個磁碟檔案用於結果輸出,map的結果是直接輸出到磁碟檔案上的,100kb的記憶體緩衝是用來建立fast buffered outputstream輸出流。這種方式乙個問題就是shuffle檔案過多。
針對上述shuffle過程產生的檔案過多問題,spark有另外一種改進的shuffle過程:consolidation shuffle,以期顯著減少shuffle檔案的數量。在consolidation shuffle中每個bucket並非對應乙個檔案,而是對應檔案中的乙個segment部分。job的map在某個節點上第一次執行,為每個reduce建立bucket對應的輸出檔案,把這些檔案組織成shufflefilegroup,當這次map執行完之後,這個shufflefilegroup可以釋放為下次迴圈利用;當又有map在這個節點上執行時,不需要建立新的bucket檔案,而是在上次的shufflefilegroup中取得已經建立的檔案繼續追加寫乙個segment;當前次map還沒執行完,shufflefilegroup還沒有釋放,這時如果有新的map在這個節點上執行,無法迴圈利用這個shufflefilegroup,而是只能建立新的bucket檔案組成新的shufflefilegroup來寫輸出。
比如乙個job有3個map和2個reduce:(1) 如果此時集群有3個節點有空槽,每個節點空閒了乙個core,則3個map會排程到這3個節點上執行,每個map都會建立2個shuffle檔案,總共建立6個shuffle檔案;(2) 如果此時集群有2個節點有空槽,每個節點空閒了乙個core,則2個map先排程到這2個節點上執行,每個map都會建立2個shuffle檔案,然後其中乙個節點執行完map之後又排程執行另乙個map,則這個map不會建立新的shuffle檔案,而是把結果輸出追加到之前map建立的shuffle檔案中;總共建立4個shuffle檔案;(3) 如果此時集群有2個節點有空槽,乙個節點有2個空core乙個節點有1個空core,則乙個節點排程2個map乙個節點排程1個map,排程2個map的節點上,乙個map建立了shuffle檔案,後面的map還是會建立新的shuffle檔案,因為上乙個map還正在寫,它建立的shufflefilegroup還沒有釋放;總共建立6個shuffle檔案。
reduce去拖map的輸出資料,spark提供了兩套不同的拉取資料框架:通過socket連線去取資料;使用netty框架去取資料。
每個節點的executor會建立乙個blockmanager,其中會建立乙個blockmanagerworker用於響應請求。當reduce的get_block的請求過來時,讀取本地檔案將這個blockid的資料返回給reduce。如果使用的是netty框架,blockmanager會建立shufflesender用於傳送shuffle資料。
並不是所有的資料都是通過網路讀取,對於在本節點的map資料,reduce直接去磁碟上讀取而不再通過網路框架。
reduce拖過來資料之後以什麼方式儲存呢?spark map輸出的資料沒有經過排序,spark shuffle過來的資料也不會進行排序,spark認為shuffle過程中的排序不是必須的,並不是所有型別的reduce需要的資料都需要排序,強制地進行排序只會增加shuffle的負擔。reduce拖過來的資料會放在乙個hashmap中,hashmap中儲存的也是對,key是map輸出的key,map輸出對應這個key的所有value組成hashmap的value。spark將shuffle取過來的每乙個對插入或者更新到hashmap中,來乙個處理乙個。hashmap全部放在記憶體中。
shuffle取過來的資料全部存放在記憶體中,對於資料量比較小或者已經在map端做過合併處理的shuffle資料,占用記憶體空間不會太大,但是對於比如group by key這樣的操作,reduce需要得到key對應的所有value,並將這些value組乙個陣列放在記憶體中,這樣當資料量較大時,就需要較多記憶體。
當記憶體不夠時,要不就失敗,要不就用老辦法把記憶體中的資料移到磁碟上放著。spark意識到在處理資料規模遠遠大於記憶體空間時所帶來的不足,引入了乙個具有外部排序的方案。shuffle過來的資料先放在記憶體中,當記憶體中儲存的對超過1000並且記憶體使用超過70%時,判斷節點上可用記憶體如果還足夠,則把記憶體緩衝區大小翻倍,如果可用記憶體不再夠了,則把記憶體中的對排序然後寫到磁碟檔案中。最後把記憶體緩衝區中的資料排序之後和那些磁碟檔案組成乙個最小堆,每次從最小堆中讀取最小的資料,這個和mapreduce中的merge過程類似。
mapreduce
spark
collect
在記憶體中構造了一塊資料結構用於map輸出的緩衝
沒有在記憶體中構造一塊資料結構用於map輸出的緩衝,而是直接把輸出寫到磁碟檔案
sort
map輸出的資料有排序
map輸出的資料沒有排序
merge
對磁碟上的多個spill檔案最後進行合併成乙個輸出檔案
在map端沒有merge過程,在輸出時直接是對應乙個reduce的資料寫到乙個檔案中,這些檔案同時存在併發寫,最後不需要合併成乙個
copy框架
jetty
netty或者直接socket流
對於本節點上的檔案
仍然是通過網路框架拖取資料
不通過網路框架,對於在本節點上的map輸出檔案,採用本地讀取的方式
copy過來的資料存放位置
先放在記憶體,記憶體放不下時寫到磁碟
一種方式全部放在記憶體;
另一種方式先放在記憶體
merge sort
最後會對磁碟檔案和記憶體中的資料進行合併排序
對於採用另一種方式時也會有合併排序的過程
通過上面的介紹,我們了解到,shuffle過程的主要儲存介質是磁碟,盡量的減少io是shuffle的主要優化方向。我們腦海中都有那個經典的儲存金字塔體系,shuffle過程為什麼把結果都放在磁碟上,那是因為現在記憶體再大也大不過磁碟,記憶體就那麼大,還這麼多張嘴吃,當然是分配給最需要的了。如果具有「土豪」記憶體節點,減少shuffle io的最有效方式無疑是盡量把資料放在記憶體中。下面列舉一些現在看可以優化的方面,期待經過我們不斷的努力,tdw計算引擎執行地更好。
spark作為mapreduce的高階架構,對於shuffle過程已經是優化了的,特別是對於那些具有爭議的步驟已經做了優化,但是spark的shuffle對於我們來說在一些方面還是需要優化的。
Spark的Shuffle過程介紹
spark的shuffle過程介紹 shuffle writer spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。spark中需要shuffle輸出的map任務...
Spark的Shuffle過程介紹
spark豐富了任務型別,有些任務之間資料流轉不需要通過shuffle,但是有些任務之間還是需要通過shuffle來傳遞資料,比如wide dependency的group by key。spark中需要shuffle輸出的map任務會為每個reduce建立對應的bucket,map產生的結果會根據...
hadoop和spark的shuffle異同點
spark 裡是shufflemaptask 的輸出進行 partition 不同的 partition 送到不同的 reducer spark 裡reducer 可能是下乙個 stage 裡的shufflemaptask 也可能是 resulttask reducer 以記憶體作緩衝區,邊 shu...