在map和reduce之間的過程就是shuffle,shuffle的效能直接影響整個spark的效能。所以shuffle至關重要。
從圖中得知,map輸出的結構產生在bucket中。而bucket的數量是map*reduce的個數。這裡的每乙個bucket都對應乙個檔案。map對bucket書是寫入資料,而reduce是對bucket是抓取資料也就是讀的過程。
在spark1.1.1中shuffle過程的處理交給了shuffleblockmanager來管理。
shufflemanager中有四個方法:
1)registershuffleshuffle註冊
2)getwriter獲得寫資料的物件
3)getreader獲得讀取資料的物件
4)unregistershuffle移除元資料
5)stop 停止shufflemanager
shufflemanager有兩個子類:
shuffle寫的過程需要落地磁碟。在引數
中可以配置。
接下來看下write的具體方法
如果consolidateshufflefiles為true寫檔案,為false在completedmaptasks中新增mapid。
接下來看下recyclefilegroup這個方法。引數shufflefilegroup是一組shuffle檔案,每乙個特定的map都會分配一組shufflefilegroup寫入檔案。**如下:
這裡的valunusedfilegroups = new concurrentlinkedqueue[shufflefilegroup]()是乙個鍊錶佇列。往佇列中新增shufflefilegroup
而在shufflestate.comletedmaptasks這個方法則是往bucket中填充,如果consolidateshufflefiles為false,則不需要管他。原始碼中也是這樣解釋completedmaptasks這個佇列:
原始碼中的shufflestate是記錄shuffle的乙個特定狀態。
shufflewrite有兩個子類:
hashshufflewriter中的寫方法:
再來看下sortshufflewriter的write方法:
sortshufflemanager中的讀取物件呼叫了hashshufflereader
在spark1.1.1原始碼中sortshufflemanager壓根就沒實現。
在rdd api中當呼叫reducebykey等類似的操作,則會產生shuffle了。
根據不同的業務場景,reduce的個數一般由程式猿自己設定大小。可通過「spark.default.par allelism」引數設定。
2、shuffledrdd主要是做從這個抓取資料的工作。
4、步驟1和步驟3都會涉及到spill的過程。
在作業提交的時候,dagschuduler會把shuffle的成過程切分成map和reduce兩個部分。每個部分的任務聚合成乙個stage。
shuffle在map端的時候通過shufflemaptask的runtask方法執行task。
shufflemaptask結束之後,最後走到dagscheduler的handletaskcompletion方法當中原始碼如下:
stage結束後,到reduce抓取過程。檢視blockstoreshufflefetcher原始碼如下:
Spark原始碼走讀2 Spark Submit
這裡主要說明作業提交的過程原始碼。sparksubmit在org.apache.spark.deploy中,submit是乙個單獨的程序,首先檢視它的main方法 createlaunchenv方法中設定了一些配置引數 如返回值 集群模式 執行環境等。這裡主要檢視client的集群模式。下面看下作業...
kube scheduler原始碼走讀
螞蟻金服kubernetes方向招聘 kube scheduler是k8s中相對比較簡單的乙個服務,它監聽api server獲取新建的pod,從眾多的node中選擇乙個合適的,來執行該pod。選擇的過程分兩個階段 預選階段 優選階段 本文簡單的跟進一下kube scheduler執行的整個流程。入...
pytest原始碼走讀 低版本1 0 0b6
基礎資訊 之後的走讀筆記 如果沒有特殊說明,均針對pytest的1.0.0b6 release版本 早期走讀選擇合適版本的標準 1 非最新版本 往往幾百個 貢獻者寫出來的 很難看懂 2 找可執行的老版本,比如1.0.0b6 或者1.0.0b3 往往是乙個,兩個貢獻者寫的,因為是release版本,基...