多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。
常見於各種shuffle操作,例如reducebykey,groupbykey,join等操作。
key本身分布不均勻(包括大量的key為空)
key的設定不合理
shuffle時的併發度不夠
計算方式有誤
spark中乙個stage的執行時間受限於最後那個執行完的task,因此執行緩慢的任務會拖累整個程式的執行速度(分布式程式執行的速度是由最慢的那個task決定的)。
過多的資料在同乙個task中執行,將會把executor撐爆,造成oom,程式終止執行。
乙個理想的分布式程式:
發生資料傾斜時,任務的執行速度由最大的那個任務決定:
發現資料傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。
如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個
df.select("key").sample(false,0.1).(k=>(k,1)).reducebykey(_+_).map(k=>(k._2,k._1)).sortbykey(false).take(10)
如果發現多數資料分布都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。
經過分析,傾斜的資料主要有以下三種情況:
null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起。
無效資料,大量重複的測試資料或是對結果影響不大的有效資料。
有效資料,業務導致的正常資料分布。
第1,2種情況,直接對資料進行過濾即可。
第3種情況則需要進行一些特殊操作,常見的有以下幾種做法。
隔離執行,將異常的key過濾出來單獨處理,最後與正常資料的處理結果進行union操作。
對key先新增隨機值,進行操作後,去掉隨機值,再進行一次操作。
使用reducebykey
代替groupbykey
使用map join。
如果使用reducebykey
因為資料傾斜造成執行失敗的問題。具體操作如下:
將原始的key
轉化為key + 隨機值
(例如random.nextint)
對資料進行reducebykey(func)
將key + 隨機值
轉成key
再對資料進行reducebykey(func)
tip1: 如果此時依舊存在問題,建議篩選出傾斜的資料單獨處理。最後將這份資料與正常的資料進行union即可。
tips2: 單獨處理異常資料時,可以配合使用map join解決。
dataframe
和sparksql
可以設定spark.sql.shuffle.partitions
引數控制shuffle的併發度,預設為200。
rdd操作可以設定spark.default.parallelism
控制併發度,預設引數由不同的cluster manager控制。
侷限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使乙個task單獨執行它,也會受到資料傾斜的困擾。
在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程式避免shuffle的過程,自然也就沒有資料傾斜的困擾了。
侷限性: 因為是先將小資料傳送到每個executor上,所以資料量不能太大。
具體使用方法和處理流程參照:
spark map-side-join 關聯優化
spark join broadcast優化
解決 spark 中的資料傾斜問題
發現資料傾斜的時候,不要急於提高 executor 的資源,修改引數 或是修改程式,首先要檢查資料本身,是否存在異常資料。1 資料問題造成的資料傾斜 找出異常的 key 如果任務長時間卡在最後最後 1 個 幾個 任務,首先要對 key 進行 抽樣分析,判斷是哪些 key 造成的。選取 key,對資料...
spark解決資料傾斜問題
參考 資料傾斜發生的原理 資料傾斜的原理很簡單 在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的乙個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。比如大部分key對應10條資料,但是個別key卻對應了...
Spark 資料傾斜
計算資料時,資料分散度不夠,導致大量資料集中到一台或幾台機器上計算。區域性計算遠低於平均計算速度,整個過程過慢。部分任務處理資料量過大,可能oom,任務失敗,進而應用失敗。1 executor lost driver oom shuffle過程出錯 2 正常執行任務突然失敗 3 單個executor...