(1)maybespill方法
protected def maybespill(collection: c, currentmemory: long): boolean =
shouldspill = shouldspill || _elementsread > numelementsforcespillthreshold
// actually spill
if (shouldspill)
shouldspill
}
溢寫時機分析:
當前集合包含的 records 數超過spark.shuffle.spill.numelementsforcespillthreshold
指定值,該值預設大小為long.maxvalue
這種情況複雜些,其判斷流程如下:
主要是在進行 spill 之前會嘗試申請更多的記憶體來存放 records 來避免 spill。
若需要進行 spill,則首先會進行 spill 操作,然後釋放已 spill 的集合對應的記憶體,釋放主要是釋放 execution pool 記憶體以供其他 tasks 使用,並將mymemorythreshold
賦值為初始值spark.shuffle.spill.initialmemorythreshold
對應值,即初始值
(2)溢位寫記憶體取樣演算法
buffer.estimatesize方法計算記憶體(spark shuffle時記憶體**演算法)
sizetracker中的屬性:
private val samples = new mutable.queue[sample]//乙個佇列,用於儲存對資料的取樣樣本
private val sample_growth_rate = 1.1//取樣間隔次數增長率
private var bytesperupdate: double = _//根據samples中最後兩個樣本計算出的記錄記憶體平均增長率
private var numupdates: long = _//更新操作(包括插入和更新)的總次數
private var nextsamplenum: long = _//下一次取樣操作的次數
......
case class sample(size: long, numupdates: long)
·sample_growth_rate:取樣間隔次數增長率,固定值1.1。代表下次抽樣時候更新的次數應該是這次抽樣更新次數的1.1倍,比如上次是更新10000次時候抽樣,下次抽樣就得是更新11000次時候再抽樣,可以避免每次更新都抽樣,減少抽樣花銷。用於計算nextsamplenum的值
·samples:樣本佇列。最後兩個樣本將被用於估算。
·bytesperupdate:記憶體平均增長率 計算公式如下:
·numupdates:更新操作(包括插入和更新)的總次數。當前buffer中的總算子(乙個記錄操作的編號,當然你也可以理解為抽樣集合中的元素個數)
·nextsamplenum:下次取樣時,numupdates的值,即numupdates的值增長到與nextsamplenum相同時,才會再次取樣。可以理解為:代表下次要抽樣的時候集合的個數,就是此次抽樣時候的個數*1.1
了解了sizetracker的屬性,我們就可以更容易理解sizetracker提供的方法了。
1.採集樣本takesample
takesample方法用於採集樣本,其實現如**清單所示。
private def takesample(): unit =
//計算平均增長率
val bytesdelta = samples.tolist.reverse match
bytesperupdate = math.max(0, bytesdelta) // 計算每次更新的位元組數
// 計算下次取樣的取樣號
nextsamplenum = math.ceil(numupdates * sample_growth_rate).tolong
}
/**
* estimate the current size of the collection in bytes. o(1) time.
*/def estimatesize(): long =
這個estimatesize 就是上次的size+增長率*增長量。增長率和size就是上次抽樣得到的。
spill 的操作要考慮到之後要對之後生成的 spill 檔案做 merge,因為最終乙個 shuffle map task 只生成乙個輸出檔案和 index 檔案。
如果是需要做 map 端 combine,spill 時會對 map 中的資料先按 partition id 進行排序,若也提供了 key comparator,則會對屬於同乙個 partition 的 records 按 key 進行排序。做完排序後,先進行序列化再寫入磁碟檔案。
如果是不需要做 map 端的 combine,則只需對 buffer 按 partition id 進行排序即可,不需要對同一partition 的 records 按 key 進行排序。排序後,同樣先序列化,再寫入磁碟檔案。
之後做 merge 時,使用 spillreader 來讀取 spill 資料又要先反序列化,再做最終排序,再寫入最終檔案,這一過程是 shuffle 過程中消耗比較大的一部分。
合併的核心流程如下,由externalsorter#writepartitionedfile(...)
方法實現
其中,最關鍵的 merge 流程如下:
為每個 spill 出來的檔案生成乙個 reader: spillreader,得到readers: seq[spillreader]
(reader 讀取 spilled 檔案要先反序列化)
將記憶體集合進行 buffered,得到 inmembuffered
針對每個 partition p,執行以下操作:
讀懂 diff 命令
diff是unix系統的乙個很重要的工具程式。它用來比較兩個文字檔案的差異,是 版本管理的基石之一。你在命令列下,輸入 1 diff 變動前的檔案 變動後的檔案 diff就會告訴你,這兩個檔案有何差異。它的顯示結果不太好懂,下面我就來說明,如何讀懂diff。一 diff的三種格式 由於歷史原因,di...
讀懂diff命令
diff是unix系統的乙個很重要的工具程式。它用來比較兩個文字檔案的差異,是 版本管理的基石之一。你在命令列下,輸入 1 diff 變動前的檔案 變動後的檔案 diff就會告訴你,這兩個檔案有何差異。它的顯示結果不太好懂,下面我就來說明,如何讀懂diff。一 diff的三種格式 由於歷史原因,di...
如何讀懂DataSheet
如何看datasheet 某些狀態所要維持的最短或最長時間。因為器件的工作速度也是有限的,一般都跟不上主控晶元的速度,所以它們直接之間要有時序配合。這有助於我們對晶元有乙個巨集觀的了解,此時需要弄清楚該晶元的一些比較特殊的功能,充分利用晶元的特殊功能,對整體電路的設計,將會有極大的好處。2.凡是晶元...