在做shuffle階段的優化過程中,遇到了資料傾斜的問題,造成了對一些情況下優化效果不明顯。主要是因為在job完成後的所得到的 counters是整個job的總和,優化是基於這些counters得出的平均值,而由於資料傾斜的原因造成map處理資料量的差異過大,使得這些平均 值能代表的價值降低。hive的執行是分階段的,map處理資料量的差異取決於上乙個stage的reduce輸出,所以如何將資料均勻的分配到各個 reduce中,就是解決資料傾斜的根本所在。規避錯誤來更好的執行比解決錯誤更高效。在檢視了一些資料後,總結如下。
情形
後果
join
其中乙個表較小,
但是key集中
分發到某乙個或幾個reduce上的資料遠高於平均值
大表與大表,但是分桶的判斷欄位0值或空值過多
這些空值都由乙個reduce處理,灰常慢
group by
group by 維度過小,
某值的數量過多
處理某值的reduce灰常耗時
count distinct
某特殊值過多
處理此特殊值的reduce耗時
1)、key分布不均勻
2)、業務資料本身的特性
3)、建表時考慮不周
4)、某些sql語句本身就有資料傾斜
任務進度長時間維持在99%(或100%),檢視任務監控頁面,發現只有少量(1個或幾個)reduce子任務未完成。因為其處理的資料量和其他reduce差異過大。
單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。 最長時長遠大於平均時長。
hive.map.aggr=true
map 端部分聚合,相當於combiner
hive.groupby.skewindata=true
有資料傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計畫會有兩個 mr job。第乙個 mr job 中,map 的輸出結果集合會隨機分布到 reduce 中,每個 reduce 做部分聚合操作,並輸出結果,這樣處理的結果是相同的 group by key 有可能被分發到不同的 reduce 中,從而達到負載均衡的目的;第二個 mr job 再根據預處理的資料結果按照 group by key 分布到 reduce 中(這個過程可以保證相同的 group by key 被分布到同乙個 reduce 中),最後完成最終的聚合操作。
如何join:
關於驅動表的選取,選用join key分布最均勻的表作為驅動表
做好列裁剪和filter操作,以達到兩表做join的時候,資料量相對變小的效果。
大小表join:
使用map join讓小的維度表(1000條以下的記錄條數) 先進記憶體。在map端完成reduce.
大表join大表:
把空值的key變成乙個字串加上隨機數,把傾斜的資料分到不同的reduce上,由於null值關聯不上,處理後並不影響最終結果。
count distinct大量相同特殊值
count distinct時,將值為空的情況單獨處理,如果是計算count distinct,可以不用處理,直接過濾,在最後結果中加1。如果還有其他計算,需要進行group by,可以先將值為空的記錄單獨處理,再和其他計算結果進行union。
group by維度過小:
採用sum() group by的方式來替換count(distinct)完成計算。
特殊情況特殊處理:
在業務邏輯優化效果的不大情況下,有些時候是可以將傾斜的資料單獨拿出來處理。最後union回去。
場景:如日誌中,常會有資訊丟失的問題,比如日誌中的 user_id,如果取其中的 user_id 和 使用者表中的user_id 關聯,會碰到資料傾斜的問題。
解決方法1:user_id為空的不參與關聯(紅色字型為修改後)
select * from loga解決方法2:賦與空值分新的key值joinusers b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from loga
where a.user_id is null;
select *結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , 』』, null 等) 產生的傾斜問題。把空值的 key 變成乙個字串加上隨機數,就能把傾斜的資料分到不同的reduce上 ,解決資料傾斜問題。from loga
left outer joinusers b
on case when a.user_id is null then concat(『hive』,rand() ) else a.user_id end = b.user_id;
場景:使用者表中user_id欄位為int,log表中user_id欄位既有string型別也有int類 型。當按照user_id進行兩個表的join操作時,預設的hash操作會按int型的id來進行分配,這樣會導致所有string型別id的記錄都分 配到乙個reducer中。
解決方法:把數字型別轉換成字串型別
select * fromusers a使用 map join 解決小表(記錄數少)關聯大表的資料傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理。以下例子:left outer joinlogs b
on a.usr_id = cast(b.user_id as string)
select * from logausers 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支援這麼大的小表。如果用普通的 join,又會碰到資料傾斜的問題。left outer joinusers b
on a.user_id = b.user_id;
解決方法:
select /*+mapjoin(x)*/* from log a假如,log裡user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點選的會員不會太多,有佣金的會員不會太多等等。所以這個方法能解決很多場景下的資料傾斜問題。left outer join (
select /*+mapjoin(c)*/d.*
from ( select distinct user_id from log ) c
join users d
on c.user_id = d.user_id
) xon a.user_id = b.user_id;
使map的輸出資料更均勻的分布到reduce中去,是我們的最終目標。由於hash演算法的侷限性,按key hash會或多或少的造成資料傾斜。大量經驗表明資料傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。在此給出較為通用的步驟:
1、取樣log表,哪些user_id比較傾斜,得到乙個結果表tmp1。由於對計算框架來說,所有的資料過來,他都是不知道資料分布情況的,所以取樣是並不可少的。
2、資料的分布符合社會學統計規則,貧富不均。傾斜的key不會太多,就像乙個社會的富人不多,奇特的人不多一樣。所以tmp1記錄數會很少。把 tmp1和users做map join生成tmp2,把tmp2讀到distribute file cache。這是乙個map過程。
3、map讀入users和log,假如記錄來自log,則檢查user_id是否在tmp2裡,如果是,輸出到本地檔案a,否則生 成的key,value對,假如記錄來自member,生成的 key,value對,進入reduce階段。
4、最終把a檔案,把stage3 reduce階段輸出的檔案合併起寫到hdfs。
如果確認業務需要這樣傾斜的邏輯,考慮以下的優化方案:
1、對於join,在判斷小表不大於1g的情況下,使用map join
2、對於group by或distinct,設定 hive.groupby.skewindata=true
3、盡量使用上述的sql語句調節進行優化
Hive資料傾斜總結
傾斜的原因 使map的輸出資料更均勻的分布到reduce中去,是我們的最終目標。由於hash演算法的侷限性,按key hash會或多或少的造成資料傾斜。大量經驗表明資料傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。解決思路 hive的執行是分階段的,map處理資料量的差異取決於上乙個stage的r...
Hive資料傾斜
hive資料傾斜問題 傾斜原因 map輸出資料按key hash分配到reduce中,由於key分布不均勻 或者業務資料本身的特點。等原因造成的reduce上的資料量差異過大。1.1 key分布不均勻 1.2 業務資料本身的特性 1.3 sql語句造成資料傾斜 解決方案 1 引數調節 hive.ma...
HIVE 資料傾斜
解決資料傾斜,歸根結底是使map的輸出資料更均勻的分布到reduce中去。1 join 1 其中乙個表較小,但是key集中。分發到某乙個或幾個reduce上的資料遠高於平均值 2 大表與大表,但是分桶的判斷欄位0值或空值過多。這些空值都由乙個reduce處理,非常慢 2 group by group...