原**:
具體思路是給每個資料來源加上乙個數字標記label,這樣hadoop對其排序後同乙個欄位的資料排在一起並且按照label排好序了,於是直接將相鄰相同key的資料合併在一起輸出就得到了結果。
1、 map階段:給表1和表2加標記,其實就是多輸出乙個字段,比如表一加標記為0,表2加標記為2;
2、 partion階段:根據學號key為第一主鍵,標記label為第二主鍵進行排序和分割槽
3、 reduce階段:由於已經按照第一主鍵、第二主鍵排好了序,將相鄰相同key資料合併輸出
hadoop使用python實現join的map和reduce**
# -*- coding: utf-8 -*-
#來自瘋狂的螞蟻www.crazyant.net
import os
import sys
#獲取當前正在處理的檔案的名字,這裡我們有兩個輸入檔案
#所以要加以區分
filepath = os.environ["map_input_file"]
filename = os.path.split(filepath)[-1]
for line in sys.stdin:
if line.strip()=="":
continue
fields = line[:-1].split("\t")
sno = fields[0]
#以下判斷filename的目的是不同的檔案有不同的字段,並且需加上不同的標記
if filename == 'data_info':
name = fields[1]
#下面的數字'0'就是為資料來源1加上的統一標記
print '\t'.join((sno,'0',name))
elif filename == 'data_grade':
courseno = fields[1]
grade = fields[2]
#下面的數字'1'就是為資料來源1加上的統一標記
print '\t'.join((sno,'1',courseno,grade))
if __name__=='__main__':
reducer的**:
# -*- coding: utf-8 -*-#reducer.py
#來自瘋狂的螞蟻www.crazyant.net
import sys
def reducer():
#為了記錄和上乙個記錄的區別,用lastsno記錄上個sno
lastsno = ""
for line in sys.stdin:
if line.strip()=="":
continue
fields = line[:-1].split("\t")
sno = fields[0]
'''處理思路:
遇見當前key與上一條key不同並且label=0,就記錄下來name值,
當前key與上一條key相同並且label==1,則將本條資料的courseno、
grade聯通上一條記錄的name一起輸出成最終結果
'''if sno != lastsno:
name=""
#這裡沒有判斷label==1的情況,
#因為sno!=lastno,並且label=1表示該條key沒有資料來源1的資料
if fields[1]=="0":
name=fields[2]
elif sno==lastno:
#這裡沒有判斷label==0的情況,
#因為sno==lastno並且label==0表示該條key沒有資料來源2的資料
if fields[2]=="1":
courseno=fields[2]
grade=fields[3]
if name:
print '\t'.join((lastsno,name,courseno,grade))
lastsno = sno
if __name__=='__main__':
reducer()
#先刪除輸出目錄
~/hadoop-client/hadoop/bin/hadoop fs -rmr /hdfs/jointest/output
#來自瘋狂的螞蟻www.crazyant.net
#注意,下面配置中的環境值每個人機器不一樣
~/hadoop-client/hadoop/bin/hadoop streaming \
-d mapred.map.tasks=10 \
-d mapred.reduce.tasks=5 \
-d mapred.job.map.capacity=10 \
-d mapred.job.reduce.capacity=5 \
-d mapred.job.name="join--sno_name-sno_courseno_grade" \
-d num.key.fields.for.partition=1 \
-d stream.num.map.output.key.fields=2 \
-partitioner org.apache.hadoop.mapred.lib.keyfieldbasedpartitioner \
-input "/hdfs/jointest/input/*" \
-output "/hdfs/jointest/output" \
-reducer "python26/bin/python26.sh reducer.py" \
-file "reducer.py" \
-cachearchive "/share/python26.tar.gz#python26"
#看看執行成功沒,若輸出0則表示成功了
echo $?
可以自己手工構造輸入輸出資料進行測試,本程式是驗證過的。
判斷讀入資料檔案結尾 從檔案讀入資料
參考 在使用c c 讀檔案的時候,使用eof 這個函式來判斷檔案是否為空或者是否讀到檔案結尾的時候會有一些特殊情況 先看 include include using namespace std int main e return 0 上述 在vs2012下編譯執行,發現,當檔案結尾沒有空行時,結果正...
用C 一次讀入和寫出多個檔案 fstream
在c 中,一般我們使用freopen來進行一些檔案操作,但是freopen的bug很多,只能讀入單個檔案,那有沒有讀入多個檔案的辦法呢?當然有!那就是fsream!舉個例子 讓你將兩個檔案的字串交換。用freopen完全做不了,用fstream include using namespace std...
Java 檔案讀入方法
獲得控制台使用者輸入的資訊 public string getinputmessage throws ioexception.五.轉移檔案目錄 轉移檔案目錄不等同於複製檔案,複製檔案是複製後兩個目錄都存在該檔案,而轉移檔案目錄則是轉移後,只有新目錄中存在該檔案。public void changed...