一段程式只能完成功能是沒有用的,只能能夠穩定、高效率地執行才是生成環境所需要的。
本篇記錄了spark各個角度的調優技巧,以備不時之需。
額。。。從最基本的開始講,可能一些剛接觸spark的人不是很清楚spark的一些引數變數到底要配置在**。
可以通過三種方式配置引數,任選其一皆可。
程式中通過sparkconf配置:通過sparkconf物件set方法設定鍵值對,比較直觀。
程式中通過system.setproperty配置:和方法二差不多。
值得一提的是乙個略顯詭異的現象,有些引數在spark-env.sh中配置並不起作用,反而要在程式中設定才有效果。
web ui:即8088埠進入的ui介面。
driver程式日誌:根據程式提交方式的不同到指定的節點上觀察driver程式日誌。
logs資料夾下的日誌:spark集群的大部分資訊都會記錄在這裡。
works資料夾下的日誌:主要記錄work節點的資訊。
profiler工具:沒有使用過。
前景交代完畢,下面進入正題:
1、小分割槽合併的問題
由於程式中過度使用filter運算元或者使用不當,都會造成大量的小分割槽出現。
因為每次過濾得到的結果只有原來資料集的一小部分,而這些量很小的資料同樣會以一定的分割槽數並行化分配到各個節點中執行。
帶來的問題就是:任務處理的資料量很小,反覆地切換任務所消耗的資源反而會帶來很大的系統開銷。
解決方案:使用重分割槽函式coalesce進行資料緊縮、減少分割槽數並設定shuffle=true保證任務是平行計算的
減少分割槽數,雖然意味著並行度降低,但是相對比之前的大量小任務過度切換的消耗,卻是比較值得的。
這裡也可以直接使用repartition重分割槽函式進行操作,因為其底層使用的是coalesce並設定shuffle=true
2、資料傾斜問題
這是乙個生產環境中經常遇到的問題,典型的場景是:大量的資料被分配到小部分節點計算,而其他大部分節點卻只計算小部分資料。
問題產生的原因有很多,可能且不全部包括:
可選的解決方案有:
增大任務數,減少分割槽數量:這種方法和解決小分割槽問題類似。
對特殊的key進行處理,如空值等:直接過濾掉空值的key以免對任務產生干擾。
使用廣播:小資料量直接廣播,大資料量先拆分之後再進行廣播。
還有一種場景是任務執行速度傾斜問題:集群中其他節點都計算完畢了,但是只有少數幾個節點死活執行不完。(其實這和上面的那個場景是差不多的)
解決方案:
3、並行度調整
官方推薦每個cpu core分配2-3個任務。
spark會根據檔案大小預設配置map階段的任務數,所以我們能夠自行調整的就是reduce階段的分割槽數了。
4、dag排程執行優化
dag圖是spark計算的基本依賴,所以建議:
同乙個stage盡量容納更多地運算元,防止多餘的shuffle。
復用已經cache的資料。
盡可能地在transformation運算元中完成對資料的計算,因為過多的action運算元會產生很多多餘的shuffle,在劃分dag圖時會形成眾多stage。
1、大任務分發問題
spark採用akka的actor模型來進行訊息傳遞,包括資料、jar包和相關檔案等。
而akka訊息通訊傳遞預設的容量最大為10m,一旦傳遞的訊息超過這個限制就會出現這樣的錯誤:
worker任務失敗後master上會列印「lost tid:」
根據這個資訊找到對應的worker節點後檢視sparkhome/work/目錄下的日誌,檢視serialized size of result是否超過10m,就可以知道是不是akka這邊的問題了。
一旦確認是akka通訊容量限制之後,就可以通過配置spark.akka.framesize控制akka通訊訊息的最大容量。
2、broadcast在調優場景的使用
broadcast廣播,主要是用於共享spark每個task都會用到的一些唯讀變數。
對於那些每個task都會用到的變數來說,如果每個task都為這些變數分配記憶體空間顯然會使用很多多餘的資源,使用廣播可以有效的避免這個問題,廣播之後,這些變數僅僅會在每台機器上儲存乙份,有task需要使用時就到自己的機器上讀取就ok。
官方推薦,task大於20k時可以使用,可以在控制台上看task的大小。
3、collect結果過大的問題
大量資料時將資料儲存在hdfs上或者其他,不是大量資料,但是超出akka傳輸的buffer大小,通過配置spark.akka.framesize調整。
1、通過序列化手段優化
這裡只簡單介紹一下kryo。
配置引數的時候使用spark.serializer=」org.apache.spark.serializer.kryoserializer」配置
自定義定義可以被kryo序列化的類的步驟:
自定義類extends kryoregistrator
設定序列化方式conf.set(「spark.serializer」,」org.apache.spark.serializer.kryoserializer」)
conf.set(「spark.kyro.registrator」,」自定義的class」)
如果物件占用空間大,需要增加kryo的緩衝區則配置spark.kryoserializer.buffer.mb上值預設為2m
2、通過壓縮手段優化
spark的job大致可以分為兩種:
對於i/o密集型的job,能壓縮就壓縮,因為讀磁碟的時候資料壓縮了,占用空間小了,讀取速度不就快了。
對於cpu密集型的job,看具體cpu使用情況再做決定,因為使用壓縮是需要消耗一些cpu資源的,如果當前cpu已經超負荷了,再使用壓縮反而適得其反。
spark支援兩種壓縮演算法:
spark.broadcast.compress:推薦為true
spark.rdd.compress:預設為false,看情況配置,壓縮花費一些時間,但是可以節省大量記憶體空間
spark.io.compression.codec:org.apache.spark.io.lzfcompressioncodec根據情況選擇壓縮演算法
1、對外部資源的批處理操作
2、reduce和reducebykey
reduce:內部呼叫了runjob方法,是乙個action操作。
reducebykey:內部只是呼叫了combinebykey,是transformation操作。
大量的資料操作時,reduce彙總所有資料到主節點會有效能瓶頸,將資料轉換為key-value的形式使用reducebykey實現邏輯,會做類似mr程式中的combiner的操作,transformation操作分布式進行。
3、shuffle操作符的記憶體使用
使用會觸發shuffle過程的操作符時,操作的資料集合太大造成oom,每個任務執行過程中會在各自的記憶體建立hash表來進行資料分組。
可以解決的方案可能有:
spark 效能調優
核心調優引數如下 num executors executor memory executor cores driver memory spark.default.parallelizm spark.storage.memoryfraction spark.shuffle.memoryfractio...
Spark效能調優
日常工作使用spark處理業務問題中不可避免的都會碰到需要對spark的效能進行調優的情況,這裡就介紹一下對spark的效能調優。1.調節記憶體分配 因為在spark中堆記憶體被劃分為兩塊,一塊是給rdd的cache和persist操作rdd資料快取使用的,另一塊是給spark運算元函式使用的,函式...
Spark效能調優 JVM調優
通過一張圖讓你明白以下四個問題 1.jvm gc機制,堆記憶體的組成 2.spark的調優為什麼會和jvm的調優會有關聯?因為scala也是基於jvm執行的語言 3.spark中oom產生的原因 4.如何在jvm這個層面上來對spark進行調優 補充 spark程式執行時 jvm堆記憶體分配比例 r...