讀懂Sort Based Shuffle溢寫過程

2021-10-19 21:21:14 字數 3344 閱讀 3050

(1)maybespill方法

protected def maybespill(collection: c, currentmemory: long): boolean = 

shouldspill = shouldspill || _elementsread > numelementsforcespillthreshold

// actually spill

if (shouldspill)

shouldspill

}

溢寫時機分析:

當前集合包含的 records 數超過spark.shuffle.spill.numelementsforcespillthreshold指定值,該值預設大小為long.maxvalue這種情況複雜些,其判斷流程如下:

主要是在進行 spill 之前會嘗試申請更多的記憶體來存放 records 來避免 spill。

若需要進行 spill,則首先會進行 spill 操作,然後釋放已 spill 的集合對應的記憶體,釋放主要是釋放 execution pool 記憶體以供其他 tasks 使用,並將mymemorythreshold賦值為初始值spark.shuffle.spill.initialmemorythreshold對應值,即初始值

(2)溢位寫記憶體取樣演算法

buffer.estimatesize方法計算記憶體(spark shuffle時記憶體**演算法

sizetracker中的屬性:

private val samples = new mutable.queue[sample]//乙個佇列,用於儲存對資料的取樣樣本

private val sample_growth_rate = 1.1//取樣間隔次數增長率

private var bytesperupdate: double = _//根據samples中最後兩個樣本計算出的記錄記憶體平均增長率

private var numupdates: long = _//更新操作(包括插入和更新)的總次數

private var nextsamplenum: long = _//下一次取樣操作的次數

......

case class sample(size: long, numupdates: long)

·sample_growth_rate:取樣間隔次數增長率,固定值1.1。代表下次抽樣時候更新的次數應該是這次抽樣更新次數的1.1倍,比如上次是更新10000次時候抽樣,下次抽樣就得是更新11000次時候再抽樣,可以避免每次更新都抽樣,減少抽樣花銷。用於計算nextsamplenum的值

·samples:樣本佇列。最後兩個樣本將被用於估算。

·bytesperupdate:記憶體平均增長率   計算公式如下:

·numupdates:更新操作(包括插入和更新)的總次數。當前buffer中的總算子(乙個記錄操作的編號,當然你也可以理解為抽樣集合中的元素個數

·nextsamplenum:下次取樣時,numupdates的值,即numupdates的值增長到與nextsamplenum相同時,才會再次取樣。可以理解為:代表下次要抽樣的時候集合的個數,就是此次抽樣時候的個數*1.1

了解了sizetracker的屬性,我們就可以更容易理解sizetracker提供的方法了。

1.採集樣本takesample

takesample方法用於採集樣本,其實現如**清單所示。

private def takesample(): unit = 

//計算平均增長率

val bytesdelta = samples.tolist.reverse match

bytesperupdate = math.max(0, bytesdelta) // 計算每次更新的位元組數

// 計算下次取樣的取樣號

nextsamplenum = math.ceil(numupdates * sample_growth_rate).tolong

}

/**

* estimate the current size of the collection in bytes. o(1) time.

*/def estimatesize(): long =

這個estimatesize 就是上次的size+增長率*增長量。增長率和size就是上次抽樣得到的。

spill 的操作要考慮到之後要對之後生成的 spill 檔案做 merge,因為最終乙個 shuffle map task 只生成乙個輸出檔案和 index 檔案。

如果是需要做 map 端 combine,spill 時會對 map 中的資料先按 partition id 進行排序,若也提供了 key comparator,則會對屬於同乙個 partition 的 records 按 key 進行排序。做完排序後,先進行序列化再寫入磁碟檔案。

如果是不需要做 map 端的 combine,則只需對 buffer 按 partition id 進行排序即可,不需要對同一partition 的 records 按 key 進行排序。排序後,同樣先序列化,再寫入磁碟檔案。

之後做 merge 時,使用 spillreader 來讀取 spill 資料又要先反序列化,再做最終排序,再寫入最終檔案,這一過程是 shuffle 過程中消耗比較大的一部分。

合併的核心流程如下,由externalsorter#writepartitionedfile(...)方法實現

其中,最關鍵的 merge 流程如下:

為每個 spill 出來的檔案生成乙個 reader: spillreader,得到readers: seq[spillreader](reader 讀取 spilled 檔案要先反序列化)

將記憶體集合進行 buffered,得到 inmembuffered

針對每個 partition p,執行以下操作:

讀懂 diff 命令

diff是unix系統的乙個很重要的工具程式。它用來比較兩個文字檔案的差異,是 版本管理的基石之一。你在命令列下,輸入 1 diff 變動前的檔案 變動後的檔案 diff就會告訴你,這兩個檔案有何差異。它的顯示結果不太好懂,下面我就來說明,如何讀懂diff。一 diff的三種格式 由於歷史原因,di...

讀懂diff命令

diff是unix系統的乙個很重要的工具程式。它用來比較兩個文字檔案的差異,是 版本管理的基石之一。你在命令列下,輸入 1 diff 變動前的檔案 變動後的檔案 diff就會告訴你,這兩個檔案有何差異。它的顯示結果不太好懂,下面我就來說明,如何讀懂diff。一 diff的三種格式 由於歷史原因,di...

如何讀懂DataSheet

如何看datasheet 某些狀態所要維持的最短或最長時間。因為器件的工作速度也是有限的,一般都跟不上主控晶元的速度,所以它們直接之間要有時序配合。這有助於我們對晶元有乙個巨集觀的了解,此時需要弄清楚該晶元的一些比較特殊的功能,充分利用晶元的特殊功能,對整體電路的設計,將會有極大的好處。2.凡是晶元...