因為之前在tesla集群上申請了1000個節點被噴之後,現在只敢申請100個節點,後來導致乙個嚴重的結果,自從100節點以來再也沒有跑出結果。。。。。。進而意識到了spark優化的重要性,現再次總結一下:
1.乙個通用的調整並行度的方式:首先要明確乙個概念,hadoop為每乙個partition建立乙個task, 我們使用task數來控制並行度,預設情況下是有輸入源(hdfs) block的數量來決定的,因為程式需要保證 datenode 盡量在使用該輸入源的task裡面,所以block決定著input split。所以預設情況可能會造成執行時間過長 或者drive排程壓力過大(資料太瑣碎)
spark.default.parallelism =申請資源cpu cores總和2-3倍(乙個core上執行2-3個task)
2.資料傾斜:在進行shuffle的時候,需要將各個節點上相同的key放到某乙個節點上的task 來解決,比如 groupby 和join 操作都會引起shuffle 操作,如果某乙個key對應的資料量特別大,就會發生資料傾斜,現象就是一些task很快完成,但是個別task會比較慢。這一部分很簡單就能察覺:
duration time 是程式執行的時間,shuffle read 是給task分配的資料量。如果是資料傾斜我們從這兩個上面就可以很明顯的看出來(觀察時間長的task 是不是分到的資料量比較大)
主要注意一下,spark中會有乙個stage劃分的問題,一般是根據shuffle來劃分的,【 只要看到spark**中出現了乙個shuffle類運算元或者是spark sql的sql語句中出現了會導致shuffle的語句(比如group by語句),那麼就可以判定,以那個地方為界限劃分出了前後兩個stage。】,當然,如果你想知道每個key的分布情況,你可以先採集10%的樣本資料,然後countbykey(),最後foreach列印:
具體的解決方法可以參考:
3.乙個rdd在經過一些資料處理之後,例如filter,過濾了較多的資料之後,建議使用coalesce,手動減少rdd的partition的數目,減少排程壓力,提高程式效率。
4.使用高效能的運算元,在保證功能的前提下:
使用reducebykey替代groupbykey的trick,之前已經提到過,下面說一下其他常見的:
foreachpartition代替foreach :與上述的原理相同。
5.對大型變數進行廣播,當內部運算元使用需要使用外部變數的時候,在不broadcast整個變數的時候,drive 就會將運算元中需要的變數按照task 的數目進行拷貝,分給每個task乙份,如果bc之後,則會把這個變數廣播到每個executor中
關於spark的各種引數調整和原理說明 見
6.最後說乙個重點,也是我這次改進的重點,就是在大表和乙個比較小的表join的時候,such as:
usertable:5w個需要進行關聯的user id;
featuretable:百億條的feature資料;
此時,若我們選擇小表 join 大表,所有資料需要進行shuffle ,需要大量的i/o操作,相同的key會放在同乙個partition中。
使用bc會將小表分發到每個executor上面,因此關聯都在本地完成。
具體的**如下所示:
他可以使匹配不上的記錄不返回;
spark效能優化
熟悉spark核心之後,深刻體會到了spark開發中存在著大量細節左右著計算效能。趁著剛看過大概的流程,先基於目前的感受和相關資料,總結一下可能存在優化空間的地方。spark優化其實就是將不必要的開銷能省就省。建立rdd是昂貴的,從磁碟讀取rdd也是昂貴的,需要大量的io開銷,shuffle是與基於...
spark效能優化二
一 task效能優化 1,慢任務的效能優化 可以考慮減少每個partition處理的資料量,同時建議開啟 spark.speculation,2,儘量減少shuffle,例如我們要儘量減少 groupbykey 的操作,因為 groupbykey 會要求通過網路拷貝 shuffle 所有的資料,優先...
spark效能優化八
一 使用tungsten功能 1,如果想讓您的程式使用tungsten的功能,可以配置 spark.shuffle.manager tungsten sort 2,dataframe中自動開啟了 tungsten 功能 二 tungsten sort base shuffle writer內幕 1,...