《轉》MapReduce實現的Join

2021-07-24 13:01:30 字數 4677 閱讀 9702

如果資料量比較大,在記憶體進行連線操會發生oom。mapreduce join可以用來解決大資料的連線。

在map階段, 把關鍵字作為key輸出,並在value中標記出資料是來自data1還是data2。因為在shuffle階段已經自然按key分組,reduce階段,判斷每乙個value是來自data1還是data2,在內部分成2組,做集合的乘積。

這種方法有2個問題:

1, map階段沒有對資料**,shuffle的網路傳輸和排序效能很低。

2, reduce端對2個集合做乘積計算,很耗記憶體,容易導致oom。

兩份資料中,如果有乙份資料比較小,小資料全部載入到記憶體,按關鍵字建立索引。大資料檔案作為map的輸入檔案,對map()函式每一對輸入,都能夠方便地和已載入到記憶體的小資料進行連線。把連線結果按key輸出,經過shuffle階段,reduce端得到的就是已經按key分組的,並且連線好了的資料。

這種方法,要使用hadoop中的distributedcache把小資料分布到各個計算節點,每個map節點都要把小資料庫載入到記憶體,按關鍵字建立索引。

這種方法有明顯的侷限性:有乙份資料比較小,在map端,能夠把它載入到記憶體,並進行join操作。

針對map join,可以把乙份資料存放到專門的記憶體伺服器,在map()方法中,對每乙個的輸入對,根據key到記憶體伺服器中取出資料,進行連線

對其中乙份資料在記憶體中建立bloomfilter,另外乙份資料在連線之前,用bloomfilter判斷它的key是否存在,如果不存在,那這個記錄是空連線,可以忽略。

在mapreduce包裡看到有專門為join設計的包,對這些包還沒有學習,不知道怎麼使用,只是在這裡記錄下來,作個提醒。

jar: mapreduce-client-core.jar

package: org.apache.hadoop.mapreduce.lib.join

相對而言,map join更加普遍,下面的**使用distributedcache實現map join

有客戶資料customer和訂單資料orders。

customer

客戶編號

姓名位址**1

hanmeimei

shanghai

1102

leilei

beijing

1123

lucy

guangzhou

119

** order**

訂單編號

客戶編號

其它欄位被忽略11

502120033

154335053

586142

713528

2113592

40010

22000112

300

要求對customer和orders按照客戶編號進行連線,結果要求對客戶編號分組,對訂單編號排序,對其它欄位不作要求

客戶編號

訂單編號

訂單金額

姓名位址**1

150hanmeimei

shanghai

1101

2200

hanmeimei

shanghai

1101642

hanmeimei

shanghai

1101

7352

hanmeimei

shanghai

1102

81135

leilei

beijing

1122

9400

leilei

beijing

1122

102000

leilei

beijing

1122

11300

leilei

beijing

1123315

lucy

guangzhou

1193

4350

lucy

guangzhou

1193558

lucy

guangzhou

119

在提交job的時候,把小資料通過distributedcache分發到各個節點。

map端使用distributedcache讀到資料,在記憶體中構建對映關係--如果使用專門的記憶體伺服器,就把資料載入到記憶體伺服器,map()節點可以只保留乙份小快取;如果使用bloomfilter來加速,在這裡就可以構建;

map()函式中,對每一對,根據key到第2)步構建的對映裡面中找出資料,進行連線,輸出。

public

class

join

extends

configured

implements

tool

public

customerbean

(int custid, string name, string address,

string phone)

public

intgetcustid

() public string getname

() public string getaddress

() public string getphone

() }

private

static

class

custordermapoutkey

implements

writablecomparable

public

intgetcustid

()

public

intgetorderid

()

@override

public

void

write

(dataoutput out)

throws ioexception

@override

public

void

readfields

(datainput in)

throws ioexception

@override

public

intcompareto

(custordermapoutkey o)

@override

public

boolean

equals

(object obj)

else

}@override

public string tostring

() }

private

static

class

extends

int custid = integer.parseint(cols[1]); // 取出客戶編號

customerbean customerbean = customer_map.get(custid);

if (customerbean == null)

stringbuffer sb = new stringbuffer();

outputvalue.set(sb.tostring());

outputkey.set(custid, integer.parseint(cols[0]));

context.write(outputkey, outputvalue);

}@override

protected

void

setup

(context context)

throws ioexception, interruptedexception

customerbean bean = new customerbean(integer.parseint(cols[0]), cols[1], cols[2], cols[3]);

customer_map.put(bean.getcustid(), bean);}}

}/**

* reduce

* @author ivan**/

private

static

class

joinreducer

extends

reducer }

}/**

* @param args

* @throws exception

*/public

static

void

main

(string args)

throws exception

toolrunner.run(new configuration(), new join(), args);

}@override

public

intrun

(string args)

throws exception

}

執行環境

==客戶資料檔案在hdfs上的位置硬編碼為==

hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 執行程式之前先把客戶資料上傳到這個位置。

如何使用MapReduce實現兩表的join

map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...

如何使用MapReduce實現兩表的join

map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...

如何使用MapReduce實現兩表的join

map join map side join 是針對一下場景進行的優化。兩個待連線的表中,有乙個表非常大,而另乙個非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每乙個map task記憶體中存在乙份 比如放在hash table中 然後只掃瞄大表 對於大表中的每一條記錄k...