Spark Shuffle記憶體分析

2021-07-09 02:08:44 字數 3133 閱讀 3395

分布式系統裡的shuffle 階段往往是非常複雜的,而且分支條件也多,我只能按著我關注的線去描述。肯定會有不少謬誤之處,我會根據自己理解的深入,不斷更新這篇文章。

前言用spark寫程式,乙個比較蛋疼的地方就是oom,或者gc嚴重,導致程式響應緩慢,一般這種情況都會出現在shuffle階段。shuffle 是乙個很複雜的過程,任何乙個環節都足夠寫一篇文章。所以這裡,我嘗試換個方式,從實用的角度出發,讓讀者有三方面的收穫:

構建出shuffle的乙個概覽圖

剖析哪些環節,哪些**可能會讓記憶體產生問題

控制相關記憶體的引數

有時候,我們寧可程式慢點,也不要oom,至少要拋棄來跑步起來,希望這篇文章能夠讓你達成這個目標。

同時我們會提及一些類名,這些類方便你自己想更深入了解時,可以方便的找到他們,自己去探個究竟。

shuffle 概覽

spark 的shuffle 分為 write,read 兩階段。我們預先建立三個概念:

write 對應的是shufflemaptask,具體的寫操作externalsorter來負責

read 階段由shufflerdd裡的hashshufflereader來完成。如果拉來的資料如果過大,需要落地,則也由externalsorter來完成的

所有write 寫完後,才會執行read。 他們被分成了兩個不同的stage階段。

也就是說,shuffle write ,shuffle read 兩階段都可能需要落磁碟,並且通過disk merge 來完成最後的sort歸併排序。

我們看下面的:

**於該文章:

圖中的bucket 部分以及shuffle file 部分,都是由 externalsorter 來完成的。

shuffle read 部分, 讀取資料由hashshufflereader來完成,並且透過externalsorter來完成disk 排序

檔案持有數按core來進行計算,假設core數為c,假設reduce數為r, 那麼乙個executor 的持有檔案數為: c*r

shuffle write 記憶體消耗分析

shuffle write 的入口鏈路為:

org.apache.spark.scheduler.shufflemaptask—> org.apache.spark.shuffle.sort.sortshufflewriter

—> org.apache.spark.util.collection.externalsorter

會產生記憶體瓶頸的其實就是 org.apache.spark.util.collection.externalsorter。我們看看這個複雜的externalsorter都有哪些地方在占用記憶體:

第乙個地:

我們知道,資料都是先寫記憶體,記憶體不夠了,才寫磁碟。這裡的map就是那個放資料的記憶體了。

private var data = new array[anyref](2 * capacity)

也就是他消耗的並不是storage的記憶體,所謂storage記憶體,指的是由blockmanager管理起來的記憶體。

spark.shuffle.file.buffer=32k

控制的。資料獲取的過程中,序列化反序列化,也是需要空間的,所以spark 對數量做了限制,通過如下引數控制:

spark.shuffle.spill.batchsize=10000

假設乙個executor的可使用的core為 c個,那麼對應需要的記憶體消耗為:

第二個,我們也不希望maybespill太耗時,所以 maybespill 方法裡就搞了很多東西,減少耗時。我們看看都設定了哪些防線

首先會判定要不要執行內部邏輯:

elementsread % 32 == 0 && currentmemory >= mymemorythreshold

其中 mymemorythreshold可通過如下配置獲得初始值

spark.shuffle.spill.initialmemorythreshold = 5 * 1024 * 1024

接著會向 shufflememorymanager 要 2 * currentmemory - mymemorythreshold 的記憶體,shufflememorymanager 是被executor 所有正在執行的task(core) 共享的,能夠分配出去的記憶體是:

executorheapmemeory * 0.2 * 0.8

上面的數字可通過下面兩個配置來更改:

spark.shuffle.memoryfraction=0.2spark.shuffle.safetyfraction=0.8

如果無法獲取到足夠的記憶體,就會出發真的spill操作了。

看到這裡,上面的結論就顯而易見了。

然而,這裡我們忽略了乙個很大的問題,就是

estimatedsize = map.estimatesize()

為什麼說它是大問題,前面我們說了,estimatesize 是近似估計,所以有可能估的不准,也就是實際記憶體會遠遠超過預期。

具體的大家尅看看 org.apache.spark.util.collection.sizetracker

我這裡給出乙個結論:

當然,這是一種折中,因為確實不能頻繁取樣。

如果你不想出現這種問題,要麼自己替換實現這個類,要麼將

spark.shuffle.safetyfraction=0.8

設定的更小一些。

shuffle write 記憶體消耗分析

shuffle write 的入口鏈路為:

org.apache.spark.rdd.shuffledrdd—> org.apache.spark.shuffle.sort.hashshufflereader

—> org.apache.spark.util.collection.externalsorter

這裡記憶體占用點和shufflewrite 其實是差不多的。

另外就是需要考慮下拉資料的問題,就是去讀取shuffle write的資料。目前提供了 nio 和 netty的方式。這裡理論上不會出現太大記憶體占用,除非比如觸發了netty的bug等。

Spark shuffle流程細則

hadoop中的shuffle存在map任務和reduce任務之間,而spark中的shuffle過程存在stage之間。shuffle操作分為兩種,分別是寫操作和讀操作。基於排序的shuffle操作 基於雜湊的shuffle操作會產生很多檔案,這對檔案系統來說是乙個非誠大的負擔,而且在總資料量不大...

spark shuffle內在原理說明

在mapreduce框架中,shuffle是連線map和reduce之間的橋梁,map的輸出要用到reduce中必須經過shuffle這個環節,shuffle的效能高低直接影響了整個程式的效能和吞吐量。spark作為mapreduce框架的一種實現,自然也實現了shuffle的邏輯。shuffle是...

spark shuffle內在原理說明

在mapreduce框架中,shuffle是連線map和reduce之間的橋梁,map的輸出要用到reduce中必須經過shuffle這個環節,shuffle的效能高低直接影響了整個程式的效能和吞吐量。spark作為mapreduce框架的一種實現,自然也實現了shuffle的邏輯。shuffle是...