Hadoop 中的兩表join

2021-08-26 19:20:20 字數 4646 閱讀 5345

**:

作為資料分析中經常進行的join 操作,傳統dbms 資料庫已經將各種演算法優化到了極致,而對於hadoop 使用的mapreduce 所進行的join 操作,去年開始也是有各種不同的算**文出現,討論各種演算法的適用場景和取捨條件,本文討論hive 中出現的幾種join 優化,然後討論其他演算法實現,希望能給使用hadoop 做資料分析的開發人員提供一點幫助.

facebook 今年在yahoo 的hadoop summit 大會上做了乙個關於最近兩個版本的hive 上所做的一些join 的優化,其中主要涉及到hive 的幾個關鍵特性: 值分割槽 , hash 分割槽 , map join , index ,

common join

最為普通的join策略,不受資料量的大小影響,也可以叫做reduce side join ,最沒效率的一種join 方式. 它由乙個mapreduce job 完成.

首先將大表和小表分別進行map 操作, 在map shuffle 的階段每乙個map output key 變成了table_name_tag_prefix + join_column_value , 但是在進行partition 的時候它仍然只使用join_column_value 進行hash.

每乙個reduce 接受所有的map 傳過來的split , 在reducce 的shuffle 階段,它將map output key 前面的table_name_tag_prefix 給捨棄掉進行比較. 因為reduce 的個數可以由小表的大小進行決定,所以對於每乙個節點的reduce 一定可以將小表的split 放入記憶體變成hashtable. 然後將大表的每一條記錄進行一條一條的比較.

map join

map join 的計算步驟分兩步,將小表的資料變成hashtable廣播到所有的map 端,將大表的資料進行合理的切分,然後在map 階段的時候用大表的資料一行一行的去探測(probe) 小表的hashtable. 如果join key 相等,就寫入hdfs.

map join 之所以叫做map join 是因為它所有的工作都在map 端進行計算.

hive 在map join 上做了幾個優化:

hive 0.6 的時候預設認為寫在select 後面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示進行設定. hive 0.7 的時候這個計算是自動化的,它首先會自動判斷哪個是小表,哪個是大表,這個引數由(hive.auto.convert.join=true)來控制. 然後控制小表的大小由(hive.smalltable.filesize=25000000l)引數控制(預設是25m),當小表超過這個大小,hive 會預設轉化成common join. 你可以檢視hive-1642. 首先小表的map 階段它會將自己轉化成mapreduce local task ,然後從hdfs 取小表的所有資料,將自己轉化成hashtable file 並壓縮打包放入distributedcache 裡面.

目前hive 的map join 有幾個限制,乙個是它打算用bloomfilter 來實現hashtable , bloomfilter 大概比hashtable 省8-10倍的記憶體, 但是bloomfilter 的大小比較難控制.

現在distributedcache 裡面hashtable預設的複製是3份,對於乙個有1000個map 的大表來說,這個數字太小,大多數map 操作都等著distributedcache 複製.

bucket map join

hive 建表的時候支援hash 分割槽通過指定clustered by (col_name,*** ) into number_buckets

buckets 關鍵字.

當連線的兩個表的join key 就是bucket column 的時候,就可以通過

hive.optimize.bucketmapjoin= true

來控制hive 執行bucket map join 了, 需要注意的是你的小表的number_

buckets 必須是大表的倍數. 無論多少個表進行連線這個條件都必須滿足.(其實如果都按照2的指數倍來分bucket, 大表也可以是小表的倍數,不過這中間需要多計算一次,對int 有效,long 和string 不清楚)

bucket map join 執行計畫分兩步,第一步先將小表做map 操作變成hashtable 然後廣播到所有大表的map端,大表的map端接受了number_

buckets 個小表的hashtable並不需要合成乙個大的hashtable,直接可以進行map 操作,map 操作會產生number_

buckets 個split,每個split 的標記跟小表的hashtable 標記是一樣的, 在執行projection 操作的時候,只需要將小表的乙個hashtable 放入記憶體即可,然後將大表的對應的split 拿出來進行判斷,所以其記憶體限制為小表中最大的那個hashtable 的大小.

bucket map join 同時也是map side join 的一種實現,所有計算都在map 端完成,沒有reduce 的都被叫做map side join ,bucket 只是hive 的一種hash partition 的實現,另外一種當然是值分割槽.

create table a  (***) partition by (col_name)

不過一般hive 中兩個表不一定會有同乙個partition key, 即使有也不一定會是join key. 所以hive 沒有這種基於值的map side join, hive 中的list partition 主要是用來過濾資料的而不是分割槽. 兩個主要引數為(hive.optimize.cp = true 和 hive.optimize.pruner=true)

hadoop 源**中預設提供map side join 的實現, 你可以在hadoop 原始碼的src/contrib/data_join/src 目錄下找到相關的幾個類.  其中taggedmapoutput 即可以用來實現hash 也可以實現list , 看你自己決定怎麼分割槽. hadoop definitive guide 第8章關於map side join 和side data distribution 章節也有乙個例子示例怎樣實現值分割槽的map side join.

sort merge bucket map join

bucket map join 並沒有解決map join 在小表必須完全裝載進記憶體的限制, 如果想要在乙個reduce 節點的大表和小表都不用裝載進記憶體,必須使兩個表都在join key 上有序才行,你可以在建表的時候就指定sorted by join key

或者使用index 的方式.

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.bucketizedhiveinputformat;

bucket columns == join columns == sort columns

這樣小表的資料可以每次只讀取一部分,然後還是用大表一行一行的去匹配,這樣的join 沒有限制記憶體的大小. 並且也可以執行全外連線.

skew join

真實資料中資料傾斜是一定的, hadoop 中預設是使用

hive.exec.reducers.bytes.per.reducer = 1000000000

也就是每個節點的reduce 預設是處理1g大小的資料,如果你的join 操作也產生了資料傾斜,那麼你可以在hive 中設定

set hive.optimize.skewjoin = true;

set hive.skewjoin.key = skew_key_threshold (default = 100000)

hive 在執行的時候沒有辦法判斷哪個key 會產生多大的傾斜,所以使用這個引數控制傾斜的閾值,如果超過這個值,新的值會傳送給那些還沒有達到的reduce, 一般可以設定成你

(處理的總記錄數/reduce個數)的2-4倍都可以接受.

傾斜是經常會存在的,一般select 的層數超過2層,翻譯成執行計畫多於3個以上的mapreduce job 都很容易產生傾斜,建議每次執行比較複雜的sql 之前都可以設一下這個引數. 如果你不知道設定多少,可以就按官方預設的1個reduce 只處理1g 的演算法,那麼  skew_key_threshold  = 1g/平均行長. 或者預設直接設成250000000 (差不多算平均行長4個位元組)

left semi join

hive 中沒有in/exist 這樣的子句,所以需要將這種型別的子句轉成left semi join. left semi join 是只傳遞表的join key給map 階段 , 如果key 足夠小還是執行map join, 如果不是則還是common join.

join 策略中的難點

大多數只適合等值連線(equal join) ,

範圍比較和全外連線沒有合適的支援

提前分割槽,零時分割槽,排序,多種不同執行計畫很難評價最優方案.

沒有考慮io 比如臨時表,網路消耗和網路延遲時間,cpu時間,

最優的方案不代表系統資源消耗最少.

Hadoop 中的兩表join

作為資料分析中經常進行的join 操作,傳統dbms 資料庫已經將各種演算法優化到了極致,而對於hadoop 使用的mapreduce 所進行的join 操作,去年開始也是有各種不同的算 文出現,討論各種演算法的適用場景和取捨條件,本文討論hive 中出現的幾種join 優化,然後討論其他演算法實現...

Hadoop中兩表JOIN的處理方法

hadoop中兩表join的處理方法 在reduce階段join。map階段標記資料來自哪個檔案,比如來自file1標記tag 1,來自file2標記tag 2。reduce階段把key相同的file1的資料和file2的資料通過笛卡爾乘積join在一起。個人理解 舉個例子 file1 有 file...

hadoop中join的基本應用

map端的主要工作 為來自不同表或檔案的key value對打標籤以區別不停 的記錄。然後用連線字段作為key,其餘部分和新加部分的標誌作為value,最後進行輸出。reduce段主要工作 在reduce端以連線字段作為key的分組已經完成,我們只需要在每乙個分組當中將那些 於不同檔案的記錄 在ma...