技術難點 Spark效能調優 RDD運算元調優篇

2021-10-21 02:32:02 字數 2921 閱讀 7704

不廢話,直接進入正題!

1. rdd復用

在對rdd進行運算元時,要避免相同的運算元和計算邏輯之下對rdd進行重複的計算,如下圖所示:

對上圖中的rdd計算架構進行修改,得到如下圖所示的優化結果:

2. 盡早filter

獲取到初始rdd後,應該考慮盡早地過濾掉不需要的資料,進而減少對記憶體的占用,從而提公升spark作業的執行效率。

3. 讀取大量小檔案-用wholetextfiles

當我們將乙個文字檔案讀取為 rdd 時,輸入的每一行都會成為rdd的乙個元素。

也可以將多個完整的文字檔案一次性讀取為乙個pairrdd,其中鍵是檔名,值是檔案內容。

val input:rdd[string] = sc.textfile("dir/*.log")
如果傳遞目錄,則將目錄下的所有檔案讀取作為rdd。檔案路徑支援萬用字元。

但是這樣對於大量的小檔案讀取效率並不高,應該使用wholetextfiles

返回值為rdd[(string, string)],其中key是檔案的名稱,value是檔案的內容。

def wholetextfiles(path: string, minpartitions: int = defaultminpartitions): rdd[(string, string)])
wholetextfiles讀取小檔案:

val filesrdd: rdd[(string, string)] =

sc.wholetextfiles("d:\\data\\files", minpartitions = 3)

val linesrdd: rdd[string] = filesrdd.flatmap(_._2.split("\\r\\n"))

val wordsrdd: rdd[string] = linesrdd.flatmap(_.split(" "))

wordsrdd.map((_, 1)).reducebykey(_ + _).collect().foreach(println)

map(_….)  表示每乙個元素

如果是普通的map運算元,假設乙個partition有1萬條資料,那麼map運算元中的function要執行1萬次,也就是對每個元素進行操作。

rrd.foreache(_….) 表示每乙個元素

rrd.forpartitions(_….)  表示每個分割槽的資料組成的迭代器

在生產環境中,通常使用foreachpartition運算元來完成資料庫的寫入,通過foreachpartition運算元的特性,可以優化寫資料庫的效能。

如果使用foreach運算元完成資料庫的操作,由於foreach運算元是遍歷rdd的每條資料,因此,每條資料都會建立乙個資料庫連線,這是對資源的極大浪費,因此,對於寫資料庫操作,我們應當使用foreachpartition運算元

使用了foreachpartition 運算元後,可以獲得以下的效能提公升:

對於我們寫的function函式,一次處理一整個分割槽的資料;

對於乙個分區內的資料,建立唯一的資料庫連線;

只需要向資料庫傳送一次sql語句和多組引數;

5. filter+coalesce/repartition(減少分割槽)

在spark任務中我們經常會使用filter運算元完成rdd中資料的過濾,在任務初始階段,從各個分割槽中載入到的資料量是相近的,但是一旦進過filter過濾後,每個分割槽的資料量有可能會存在較大差異,如下圖所示:

根據上圖我們可以發現兩個問題:

每個partition的資料量變小了,如果還按照之前與partition相等的task個數去處理當前資料,有點浪費task的計算資源;

每個partition的資料量不一樣,會導致後面的每個task處理每個partition資料的時候,每個task要處理的資料量不同,這很有可能導致資料傾斜問題。

如上圖所示,第二個分割槽的資料過濾後只剩100條,而第三個分割槽的資料過濾後剩下800條,在相同的處理邏輯下,第二個分割槽對應的task處理的資料量與第三個分割槽對應的task處理的資料量差距達到了8倍,這也會導致執行速度可能存在數倍的差距,這也就是資料傾斜問題

針對上述的兩個問題,我們分別進行分析:

針對第乙個問題,既然分割槽的資料量變小了,我們希望可以對分割槽資料進行重新分配,比如將原來4個分割槽的資料轉化到2個分割槽中,這樣只需要用後面的兩個task進行處理即可,避免了資源的浪費。

針對第二個問題,解決方法和第乙個問題的解決方法非常相似,對分割槽資料重新分配,讓每個partition中的資料量差不多,這就避免了資料傾斜問題。

那麼具體應該如何實現上面的解決思路?我們需要coalesce運算元。

repartition與coalesce都可以用來進行重分割槽,其中repartition只是coalesce介面中shuffle為true的簡易實現,coalesce預設情況下不進行shuffle,但是可以通過引數進行設定。

假設我們希望將原本的分割槽個數a通過重新分割槽變為b,那麼有以下幾種情況:

a > b(多數分割槽合併為少數分割槽)

spark 效能調優

核心調優引數如下 num executors executor memory executor cores driver memory spark.default.parallelizm spark.storage.memoryfraction spark.shuffle.memoryfractio...

Spark效能調優

日常工作使用spark處理業務問題中不可避免的都會碰到需要對spark的效能進行調優的情況,這裡就介紹一下對spark的效能調優。1.調節記憶體分配 因為在spark中堆記憶體被劃分為兩塊,一塊是給rdd的cache和persist操作rdd資料快取使用的,另一塊是給spark運算元函式使用的,函式...

Spark效能調優 JVM調優

通過一張圖讓你明白以下四個問題 1.jvm gc機制,堆記憶體的組成 2.spark的調優為什麼會和jvm的調優會有關聯?因為scala也是基於jvm執行的語言 3.spark中oom產生的原因 4.如何在jvm這個層面上來對spark進行調優 補充 spark程式執行時 jvm堆記憶體分配比例 r...