spark的資料傾斜調優方案歸納總結:
不來虛的,直接上解決方法。
資料傾斜產生原因:在運算過程中把資料分配給不同的task,一般需要shuffle過程,同乙個key都會交給task處理,但是有時同乙個key的values資料量太多造成資料堆積等。
判斷是否發生資料傾斜:通過web ui檢視task的metrics某些task相當於其他大多數task來說所消耗相當長的時間。
資料傾斜解決方案:
(1)對源資料進行聚合並過濾掉導致傾斜的keys。
對源資料進行聚合並過濾掉導致傾斜的keys:資料來源記錄中導致資料傾斜的key可能只有幾個,這些key不影響業務的計算,匯聚後對應的value值卻很多,可能導致資料傾斜。
可使用filter運算元過濾掉傾斜的key值;
若使用hive工具,可以將資料傾斜的操作前移到hive中進行,在hive中對資料傾斜進行預處理,從資料根源上解決資料傾斜;
若在spark sql中發生傾斜,可使用where/filter運算元過濾,然後再進行groupby等操作。或採用sample運算元,對dataframe的key值取樣,若不是業務需要,使用where/filter直接過濾掉。
(2)適當提高reducer端的並行度(並行度指的是rdd的分割槽數)。
改善並行度之所以改變資料傾斜的原因:若某個task有100個key且資料量特別大,就有可能導致oom(out of memory)或任務執行特別慢,若把並行度調大,則可分解該task的資料量。
2.1對於reducebykey可傳入並行度引數numpartitions: int,該引數設定了shuffle運算元執行shuffle read task的數量;
2.2 亦可自定義partitioner,增加executor改變計算資源。
(3)使用隨機key實現雙重聚合
隨機key雙重集合:指spark分布式計算對rdd呼叫reducebykey各運算元,使用對key值隨機數字首的處理技巧,對key值進行二次聚合。第一次聚合(區域性聚合)是對每個key值加上乙個隨機數,執行第一次reducebykey聚合操作;第二次聚合(雙重聚合)是指去掉key值的字首隨機數,執行第二次reducebykey聚合,得到最終結果。
原理:使用隨機key實現雙重聚合,原理剖析運算元reducebykey。假設有資料傾斜,則給所有的key加上乙個隨機數,然後進行reducebykey操作,原來非常大的傾斜的key就分而治之變成若干個較小的key,不過此時結果和原來不一樣,則使用map操作把隨機數字首去掉,然後再次進行reducebykey操作。
注意事項:使用隨機key雙重聚合侷限於單個rdd的reducebykey、groupbykey等運算元。若兩個rdd的資料量都特別大,而且傾斜的key特別多,就無法使用隨機key雙重聚合方案。
場景應用:若兩個rdd進行操作,其中乙個rdd資料量較小,就把此rdd的資料以廣播變數的形式散發給各個executor,這樣就可與另乙個rdd進行map操作了。
條件:要進行join的rdd,其中乙個rdd資料量比較小。
(5)對傾斜keys取樣後進行單獨的join操作
若果兩個rdd的資料量都比較大,則考慮使用取樣方案。
在spark中可直接使用取樣演算法sample找出哪個rdd導致資料傾斜,sample演算法取樣一般不超過原資料的30%
對傾斜的keys進行取樣步驟:
rdd1和rdd2進行join操作,採用取樣的方式返現rdd1有嚴重的資料傾斜的key。
第一步:採用rdd提供的取樣介面,對rdd1全部原資料進行取樣,然後基於取樣的資料可以計算出哪些key的values值最多;
第二步:把全部資料分為兩部分rdd11和rdd12,rdd11代表導致傾斜的key,rdd12是不會發生資料傾斜的key;
第三步:rdd11和rdd12分別於rdd2進行join操作,然後把join的結果進行union操作,從而得出rdd1與rdd2直接join的結果。
原理:rdd12與rdd2進行join操作不會發生資料傾斜,因為兩個rdd中的key的values值都不多,rdd11和rdd2進行join操作,利用spark core的並行機制對rdd11的key的資料進行拆分。
注意事項:對rdd1進行sample取樣計算出乙個結果,取樣後進行map操作,通過reducebykey操作計數,然後對key和value進行置換,通過sortbykey進行排序,再進行map置換操作,從而找出哪個key值傾斜嚴重,對其進行過濾,提取到rdd11中,沒有發生傾斜的提取到rdd12中。
(6)使用隨機數進行join
傾斜的key加上隨機數:對key進行map操作加上隨機數,計算結果後再次進行map操作,將隨機數去掉,這樣得到的結果與不加隨機數的結果是一樣的,好處是可控制並行度。
使用隨機數的步驟:
第一步:對rdd1使用maptopair運算元進行轉換,使用random random = new random();構建乙個隨機數,如任意乙個100以內的隨機數random.nextint(100)賦值給prefix,將prefix與key值拼接成乙個新的key值:prefix_key;
第二步:對rdd2使用maptopair運算元進行轉換,對於rdd1相同的key值,同樣構建隨機數加上字首,也將prefix加上key值拼接成新的key值:prefix_key;
第三步:rdd1和rdd2根絕prefix_key進行join操作;
第四步:rdd1和rdd2進行join的結果再次map運算元轉換,去掉隨機數,得到最終結果。
使用場景:兩個rdd的某乙個key或多個key值的資料量特別大,在進行join等操作時發生資料傾斜,必須知道哪些keys的值發生傾斜。
(7)通過擴容進行join
spark資料傾斜問題
資料傾斜 加更大記憶體 跟cpu硬體是效能優化的根本之道 一 資料傾斜帶來的致命性後果 1.oom 根本原因資料太多 一般oom都是由於資料傾斜所致,spark基於jvm之上的 2.速度非常慢 二 資料傾斜的基本特徵 1.任務分配不均勻 2.個別task處理過度大量的資料 shuffle過程中遇到同...
spark解決資料傾斜問題
參考 資料傾斜發生的原理 資料傾斜的原理很簡單 在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的乙個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特別大的話,就會發生資料傾斜。比如大部分key對應10條資料,但是個別key卻對應了...
Spark 資料傾斜
計算資料時,資料分散度不夠,導致大量資料集中到一台或幾台機器上計算。區域性計算遠低於平均計算速度,整個過程過慢。部分任務處理資料量過大,可能oom,任務失敗,進而應用失敗。1 executor lost driver oom shuffle過程出錯 2 正常執行任務突然失敗 3 單個executor...