如果資料量比較大,在記憶體進行連線操會發生oom。mapreduce join可以用來解決大資料的連線。
在map階段, 把關鍵字作為key輸出,並在value中標記出資料是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每乙個value是來自data1還是data2,在內部分成2組,做集合的乘積。
這種方法有2個問題:
1, map階段沒有對資料**,shuffle的網路傳輸和排序效能很低。兩份資料中,如果有乙份資料比較小,小資料全部載入到記憶體,按關鍵字建立索引。大資料檔案作為map的輸入檔案,對map()函式每一對輸入,都能夠方便地和已載入到記憶體的小資料進行連線。把連線結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,並且連線好了的資料。2, reduce端對2個集合做乘積計算,很耗記憶體,容易導致oom。
這種方法,要使用hadoop中的distributedcache把小資料分布到各個計算節點,每個map節點都要把小資料庫載入到記憶體,按關鍵字建立索引。
這種方法有明顯的侷限性:有乙份資料比較小,在map端,能夠把它載入到記憶體,並進行join操作。針對map join,可以把乙份資料存放到專門的記憶體伺服器,在map()方法中,對每乙個的輸入對,根據key到記憶體伺服器中取出資料,進行連線
對其中乙份資料在記憶體中建立bloomfilter,另外乙份資料在連線之前,用bloomfilter判斷它的key是否存在,如果不存在,那這個記錄是空連線,可以忽略。
在mapreduce包裡看到有專門為join設計的包,對這些包還沒有學習,不知道怎麼使用,只是在這裡記錄下來,作個提醒。
jar: mapreduce-client-core.jar相對而言,map join更加普遍,下面的**使用distributedcache實現map joinpackage: org.apache.hadoop.mapreduce.lib.join
有客戶資料customer和訂單資料orders。
customer
客戶編號
姓名位址**1
hanmeimei
shanghai
1102
leilei
beijing
1123
lucy
guangzhou
119
** order**
訂單編號
客戶編號
其它欄位被忽略11
502120033
154335053
586142
713528
2113592
40010
22000112
300
要求對customer和orders按照客戶編號進行連線,結果要求對客戶編號分組,對訂單編號排序,對其它欄位不作要求
客戶編號
訂單編號
訂單金額
姓名位址**1
150hanmeimei
shanghai
1101
2200
hanmeimei
shanghai
1101642
hanmeimei
shanghai
1101
7352
hanmeimei
shanghai
1102
81135
leilei
beijing
1122
9400
leilei
beijing
1122
102000
leilei
beijing
1122
11300
leilei
beijing
1123315
lucy
guangzhou
1193
4350
lucy
guangzhou
1193558
lucy
guangzhou
119
在提交job的時候,把小資料通過distributedcache分發到各個節點。
map端使用distributedcache讀到資料,在記憶體中構建對映關係--如果使用專門的記憶體伺服器,就把資料載入到記憶體伺服器,map()節點可以只保留乙份小快取;如果使用bloomfilter來加速,在這裡就可以構建;
map()函式中,對每一對,根據key到第2)步構建的對映裡面中找出資料,進行連線,輸出。
public
class
join
extends
configured
implements
tool
public
customerbean
(int custid, string name, string address,
string phone)
public
intgetcustid
() public string getname
() public string getaddress
() public string getphone
() }
private
static
class
custordermapoutkey
implements
writablecomparable
public
intgetcustid
()
public
intgetorderid
()
@override
public
void
write
(dataoutput out)
throws ioexception
@override
public
void
readfields
(datainput in)
throws ioexception
@override
public
intcompareto
(custordermapoutkey o)
@override
public
boolean
equals
(object obj)
else
}@override
public string tostring
() }
private
static
class
extends
int custid = integer.parseint(cols[1]); // 取出客戶編號
customerbean customerbean = customer_map.get(custid);
if (customerbean == null)
stringbuffer sb = new stringbuffer();
outputvalue.set(sb.tostring());
outputkey.set(custid, integer.parseint(cols[0]));
context.write(outputkey, outputvalue);
}@override
protected
void
setup
(context context)
throws ioexception, interruptedexception
customerbean bean = new customerbean(integer.parseint(cols[0]), cols[1], cols[2], cols[3]);
customer_map.put(bean.getcustid(), bean);}}
}/**
* reduce
* @author ivan**/
private
static
class
joinreducer
extends
reducer }
}/**
* @param args
* @throws exception
*/public
static
void
main
(string args)
throws exception
toolrunner.run(new configuration(), new join(), args);
}@override
public
intrun
(string args)
throws exception
}
執行環境
==客戶資料檔案在hdfs上的位置硬編碼為==
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 執行程式之前先把客戶資料上傳到這個位置。
如何使用MapReduce實現兩表的join
map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...
如何使用MapReduce實現兩表的join
map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...
如何使用MapReduce實現兩表的join
map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...