1.資料傾斜發生時的現象
絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有1000個task,997個task都在1分鐘之內執行完了,但是剩餘兩三個task卻要一兩個小時。這種情況很常見。
原本能夠正常執行的spark作業,某天突然報出oom(記憶體溢位)異常,觀察異常棧,是我們寫的業務**造成的。這種情況比較少見。
2.資料傾斜發生的原理
資料傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的乙個task來進行處理,比如按照key進行 聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。比如大部分key對應10條資料,但是個別key卻對應了100萬 條資料,那麼大部分task可能就只會分配到10條資料,然後1秒鐘就執行完了;但是個別task可能分配到了100萬資料,要執行一兩個小時。因此,整 個spark作業的執行進度是由執行時間最長的那個task決定的。
3.解決方案一:使用hive etl預處理資料
方案適用場景:導致資料傾斜的是hive表。如果該hive表中的資料本身很不均勻(比如某個key對應了100萬資料,其他key才對應了10條資料),而且業務場景需要頻繁使用spark對hive表執行某個分析操作,那麼比較適合使用這種技術方案。
方案實現思路:此時可以評估一下,是否可以通過hive來進行資料預處理(即通過hive etl預先對資料按照key進行聚合,或者是預先和其他表進行join),然後在spark作業中針對的資料來源就不是原來的hive表了,而是預處理後的 hive表。此時由於資料已經預先進行過聚合或join操作了,那麼在spark作業中也就不需要使用原先的shuffle類運算元執行這類操作了。
方案實現原理:這種方案從根源上解決了資料傾斜,因為徹底避免了在spark中執行shuffle類運算元,那麼 肯定就不會有資料傾斜的問題了。但是這裡也要提醒一下大家,這種方式屬於治標不治本。因為畢竟資料本身就存在分布不均勻的問題,所以hive etl中進行group by或者join等shuffle操作時,還是會出現資料傾斜,導致hive etl的速度很慢。我們只是把資料傾斜的發生提前到了hive etl中,避免spark程式發生資料傾斜而已。
方案優點:實現起來簡單便捷,效果還非常好,完全規避掉了資料傾斜,spark作業的效能會大幅度提公升。
方案缺點:治標不治本,hive etl中還是會發生資料傾斜。
4.解決方案二:過濾少數導致傾斜的key
方案適用場景:如果發現導致傾斜的key就少數幾個,而且對計算本身的影響並不大的話,那麼很適合使用這種方案。比如99%的key就對應10條資料,但是只有乙個key對應了100萬資料,從而導致了資料傾斜。
方案實現思路:如果我們判斷那少數幾個資料量特別多的key,對作業的執行和計算結果不是特別重要的話,那麼幹 脆就直接過濾掉那少數幾個key。比如,在spark sql中可以使用where子句過濾掉這些key或者在spark core中對rdd執行filter運算元過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的資料量最多然後再進行過濾,那麼可以使用 sample運算元對rdd進行取樣,然後計算出每個key的數量,取資料量最多的key過濾掉即可。
方案實現原理:將導致資料傾斜的key給過濾掉之後,這些key就不會參與計算了,自然不可能產生資料傾斜。
方案優點:實現簡單,而且效果也很好,可以完全規避掉資料傾斜。
方案缺點:適用場景不多,大多數情況下,導致傾斜的key還是很多的,並不是只有少數幾個。
5.解決方案三:提高shuffle操作的並行度
方案適用場景:如果我們必須要對資料傾斜迎難而上,那麼建議優先使用這種方案,因為這是處理資料傾斜最簡單的一種方案。
方案實現思路:在對rdd執行shuffle運算元時,給shuffle運算元傳入乙個引數,比如 reducebykey(1000),該引數就設定了這個shuffle運算元執行時shuffle read task的數量。對於spark sql中的shuffle類語句,比如group by、join等,需要設定乙個引數,即spark.sql.shuffle.partitions,該引數代表了shuffle read task的並行度,該值預設是200,對於很多場景來說都有點過小。
方案實現原理:增加shuffle read task的數量,可以讓原本分配給乙個task的多個key分配給多個task,從而讓每個task處理比原來更少的資料。舉例來說,如果原本有5個 key,每個key對應10條資料,這5個key都是分配給乙個task的,那麼這個task就要處理50條資料。而增加了shuffle read task以後,每個task就分配到乙個key,即每個task就處理10條資料,那麼自然每個task的執行時間都會變短了。具體原理如下圖所示。
方案優點:實現起來比較簡單,可以有效緩解和減輕資料傾斜的影響。
方案缺點:只是緩解了資料傾斜而已,沒有徹底**問題,根據實踐經驗來看,其效果有限。
6.解決方案四:兩階段聚合(區域性聚合+全域性聚合)
方案適用場景:對rdd執行reducebykey等聚合類shuffle運算元或者在spark sql中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是區域性聚合,先給每個key都打上乙個隨機 數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的資料,執行reducebykey等聚合操作,進行區域性聚合,那麼區域性聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然後將各個key的字首給去掉,就會變成(hello,2)(hello,2),再次進行全域性聚合操作,就可以得到最終結果了,比如(hello, 4)。
方案實現原理:將原本相同的key通過附加隨機字首的方式,變成多個不同的key,就可以讓原本被乙個task處理的資料分散到多個task上去做區域性聚合,進而解決單個task處理資料量過多的問題。接著去除掉隨機字首,再次進行全域性聚合,就可以得到最終的結果。具體原理見下圖。
方案優點:對於聚合類的shuffle操作導致的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,將spark作業的效能提公升數倍以上。
方案缺點:僅僅適用於聚合類的shuffle操作,適用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
Spark資料傾斜調優
一 資料傾斜發生的原理 1 確定資料傾斜發生在第幾個stage中。可以通過spark web ui來檢視當前執行到了第幾個stage。並深入看一下當前這個stage各個task分配的資料量及執行時間 2 根據stage劃分原理,推算出來發生傾斜的那個stage對應 中的哪一部分。3 分析一下那個執行...
spark調優之資料傾斜
問題出現的原因 資料傾斜的表現 資料傾斜的表現 遇到這種方式問題莫慌看思路 問題 資料傾斜會導致資料溢位,可能是其中的某個task分配了大量的資料,執行出錯,導致資料傾斜,資料溢位.1.方案一 聚合源資料 只針對常見的聚合操作的情況 2.方案二 使用過濾的方法 只針對只使用部分資料的情況 3.方案三...
spark 效能調優
核心調優引數如下 num executors executor memory executor cores driver memory spark.default.parallelizm spark.storage.memoryfraction spark.shuffle.memoryfractio...