map(_....)//_表示每乙個元素
rrd.foreache(_....)//_表示每乙個元素
rrd.forpartitions(_....)//_表示每個分割槽的資料組成的迭代器
在生產環境中,通常使用foreachpartition運算元來完成資料庫的寫入,通過foreachpartition運算元的特性,可以優化寫資料庫的效能。
如果使用foreach運算元完成資料庫的操作,由於foreach運算元是遍歷rdd的每條資料,因此,每條資料都會建立乙個資料庫連線,這是對資源的極大浪費,因此,對於寫資料庫操作,我們應當使用foreachpartition運算元。
使用了foreachpartition運算元後,可以獲得以下的效能提公升:
1. 對於我們寫的function函式,一次處理一整個分割槽的資料;
2. 對於乙個分區內的資料,建立唯一的資料庫連線;
3. 只需要向資料庫傳送一次sql語句和多組引數;
在spark任務中我們經常會使用filter運算元完成rdd中資料的過濾,在任務初始階段,從各個分割槽中載入到的資料量是相近的,但是一旦進過filter過濾後,每個分割槽的資料量有可能會存在較大差異
我們可以發現兩個問題:
1. 每個partition的資料量變小了,如果還按照之前與partition相等的task個數去處理當前資料,有點浪費task的計算資源;
2. 每個partition的資料量不一樣,會導致後面的每個task處理每個partition資料的時候,每個task要處理的資料量不同,這很有可能導致資料傾斜問題。
如圖2-6所示,第二個分割槽的資料過濾後只剩100條,而第三個分割槽的資料過濾後剩下800條,在相同的處理邏輯下,第二個分割槽對應的task處理的資料量與第三個分割槽對應的task處理的資料量差距達到了8倍,這也會導致執行速度可能存在數倍的差距,這也就是資料傾斜問題。
針對上述的兩個問題,我們分別進行分析:
1. 針對第乙個問題,既然分割槽的資料量變小了,我們希望可以對分割槽資料進行重新分配,比如將原來4個分割槽的資料轉化到2個分割槽中,這樣只需要用後面的兩個task進行處理即可,避免了資源的浪費。
2. 針對第二個問題,解決方法和第乙個問題的解決方法非常相似,對分割槽資料重新分配,讓每個partition中的資料量差不多,這就避免了資料傾斜問題。
那麼具體應該如何實現上面的解決思路?我們需要coalesce運算元。
repartition與coalesce都可以用來進行重分割槽,其中repartition只是coalesce介面中shuffle為true的簡易實現,coalesce預設情況下不進行shuffle,但是可以通過引數進行設定。
假設我們希望將原本的分割槽個數a通過重新分割槽變為b,那麼有以下幾種情況:
a > b(多數分割槽合併為少數分割槽)
① a與b相差值不大
此時使用coalesce即可,無需shuffle過程。
② a與b相差值很大
此時可以使用coalesce並且不啟用shuffle過程,但是會導致合併過程效能低下,所以推薦設定coalesce的第二個引數為true,即啟動shuffle過程。
a < b(少數分割槽分解為多數分割槽)
此時使用repartition即可,如果使用coalesce需要將shuffle設定為true,否則coalesce無效。
我們可以在filter操作之後,使用coalesce運算元針對每個partition的資料量各不相同的情況,壓縮partition的數量,而且讓每個partition的資料量盡量均勻緊湊,以便於後面的task進行計算操作,在某種程度上能夠在一定程度上提公升效能。
注意:local模式是程序內模擬集群執行,已經對並行度和分割槽數量有了一定的內部優化,因此不用去設定並行度和分割槽數量。
在第一節的常規效能調優中我們講解了並行度的調節策略,但是,並行度的設定對於spark sql是不生效的,使用者設定的並行度只對於spark sql以外的所有spark的stage生效。
spark sql的並行度不允許使用者自己指定,spark sql自己會預設根據hive表對應的hdfs檔案的split個數自動設定spark sql所在的那個stage的並行度,使用者自己通spark.default.parallelism引數指定的並行度,只會在沒spark sql的stage中生效。
由於spark sql所在stage的並行度無法手動設定,如果資料量較大,並且此stage中後續的transformation操作有著複雜的業務邏輯,而spark sql自動設定的task數量很少,這就意味著每個task要處理為數不少的資料量,然後還要執行非常複雜的處理邏輯,這就可能表現為第乙個有spark sql的stage速度很慢,而後續的沒有spark sql的stage執行速度非常快。
為了解決spark sql無法設定並行度和task數量的問題,我們可以使用repartition運算元。
spark sql這一步的並行度和task數量肯定是沒有辦法去改變了,但是,對於spark sql查詢出來的rdd,立即使用repartition運算元,去重新進行分割槽,這樣可以重新分割槽為多個partition,從repartition之後的rdd操作,由於不再設計spark sql,因此stage的並行度就會等於你手動設定的值,這樣就避免了spark sql所在的stage只能用少量的task去處理大量資料並執行複雜的演算法邏輯。
reducebykey相較於普通的shuffle操作乙個顯著的特點就是會進行map端的本地聚合,map端會先對本地的資料進行combine操作,然後將資料寫入給下個stage的每個task建立的檔案中,也就是在map端,對每乙個key對應的value,執行reducebykey運算元函式。reducebykey運算元的執行過程如圖2-8所示:
使用reducebykey對效能的提公升如下:
本地聚合後,在map端的資料量變少,減少了磁碟io,也減少了對磁碟空間的占用;
本地聚合後,下乙個stage拉取的資料量變少,減少了網路傳輸的資料量;
本地聚合後,在reduce端進行資料快取的記憶體占用減少;
本地聚合後,在reduce端進行聚合的資料量減少。
基於reducebykey的本地聚合特徵,我們應該考慮使用reducebykey代替其他的shuffle運算元,例如groupbykey。reducebykey與groupbykey的執行原理
Spark效能調優九之常用運算元調優
經過一次filter操作以後,每個partition的資料量不同程度的變少了,這裡就出現了乙個問題 由於每個partition的資料量不一樣,出現了資料傾斜的問題。比如上圖中執行filter之後的第乙個partition的資料量還有9000條。解決方案 針對上述出現的問題,我們可以將filter操作...
技術難點 Spark效能調優 RDD運算元調優篇
不廢話,直接進入正題!1.rdd復用 在對rdd進行運算元時,要避免相同的運算元和計算邏輯之下對rdd進行重複的計算,如下圖所示 對上圖中的rdd計算架構進行修改,得到如下圖所示的優化結果 2.盡早filter 獲取到初始rdd後,應該考慮盡早地過濾掉不需要的資料,進而減少對記憶體的占用,從而提公升...
Spark效能調優 JVM調優
通過一張圖讓你明白以下四個問題 1.jvm gc機制,堆記憶體的組成 2.spark的調優為什麼會和jvm的調優會有關聯?因為scala也是基於jvm執行的語言 3.spark中oom產生的原因 4.如何在jvm這個層面上來對spark進行調優 補充 spark程式執行時 jvm堆記憶體分配比例 r...