資源排程:
(1)executor預設在集群中分散啟動,可通過引數配置集中在某個work啟動,不過分散啟動有利於資料本地化。
(2)如果spark-submit提交任務時,如果不指定--executor-cores,則spark會在每個work中啟動乙個executor並消耗掉work中的所有core和1g的記憶體。
(3)如果只設定--executor-cores而不設定--total-executor-cores則會,每啟動乙個executor耗費--executor-cores配置的核,而且也會消耗掉所有work中的core,直到不能在啟動executor為止。
(4)只有指定--executor-cores並且指定--total-executor-cores,才會限定住executor的個數和每個executor需要的core個數。
任務排程:
按照action運算元劃分job,每個job由dagschedule劃分stage,由finalrdd開始由後向前遞迴劃分stage,劃分stage的關鍵方法是submitstage:
privatedef submitstage(stage: stage)
else
waitingstages +=stage}}
} else
}
每次找到窄依賴會進行壓棧操作,當所有窄依賴都找到完畢以後,會返回missing集合,missing中儲存的是還沒有向內繼續切割的不完整的stage。getmissingparentstages方法如下:
private def getmissingparentstages(stage: stage): list[stage] =case narrowdep: narrowdependency[_] =>waitingforvisit.push(narrowdep.rdd)}}
}}
}waitingforvisit.push(stage.rdd)
while
(waitingforvisit.nonempty)
missing.tolist
}
當stage劃分完畢以後,會將每個stage封裝成task,並最後將task集合傳送給executor去執行。
ps:優化
(1)consolidationfile引數開啟,這樣只在shuffle的第乙個並行階段建立buffer和對應的file,後面的executor執行task不在繼續建立檔案,減少了檔案建立的代價。
(2)上游shufflemaptask輸出檔案以後,下游resulttask拉取資料時,可調整每次拉取資料的多少引數,這個要視情況而定,如果嗎茫然設定過大則可能發生oom的風險。
(3)可調整資料本地化級別引數,調整本地化級別引數的等待時間,如果為了本地化而等待時間過長,拖慢整個spark作業,則也不值得,所以要全橫設定。
(4)多次重複使用的資料要進行persist儲存,以免下次計算。
(5)如果多次使用某些固定資料時,比如字典資料,則需要使用broadcast,這樣每個executor程序中存在乙份,而不會每個task中複製乙份。
(6)使用kryo序列化框架,更快的序列化更好的壓縮
Spark資源排程和任務排程概述
以standalone client模式為例,序列圖如下 圖中 1 6 資源排程 7 11 任務排程 spark資源排程和任務排程的流程 總結 taskscheduler不僅可以重試失敗的task,還可以重新執行緩慢的task,這是spark中的推測執行機制,預設關閉,對於資料清洗的場景要關閉,防止...
spark的資源排程和任務排程以及粗細粒度資源申請
taskschedule是任務排程的低層排程器,這裡taskset其實就是乙個集合,裡面封裝的就是乙個個task任務,也就是stage中的並行度task任務 taskscheduler 不僅能重試失敗的 task,還會重試 straggling 落後,緩慢 task 也就是執行速度比其他 task ...
Spark的資源排程
7 加深理解 val works new hashset workinfo val waitingdrivers new arraybuffer driverinfo 可能直接看下面的知識點會有點迷惑,若不理解可以結合第三部分的流程圖一起看 works 集合採用hashset陣列儲存work的節點資...