①inputformat
呼叫recordreader
,從輸入目錄的檔案中,讀取一組資料,封裝為keyin-valuein
物件
④outputformat呼叫recordwriter
,將reducer處理後的keyout-valueout寫出到檔案
關於這些名詞的解釋參考我之前的文章mapreduce計算框架的核心程式設計思想。需求:統計/hello目錄中每個檔案的單詞數量,
a-p開頭的單詞放入到乙個結果檔案中,
q-z開頭的單詞放入到另外乙個結果檔案中。
例如:/hello/a.txt,檔案大小200m
hello,hi,hadoop
hive,hadoop,hive,
zoo,spark,wow
zoo,spark,wow
…/hello/b.txt,檔案大小100m
hello,hi,hadoop
zoo,spark,wow
…①切片(切分資料)
/hello/a.txt 200m
/hello/b.txt 100m
預設的切分策略是以檔案為單位,以檔案的塊大小(128m)為片大小進行切片!
split0:/hello/a.txt,0-128m
split1: /hello/a.txt,128m-200m
split2: /hello/b.txt,0m-100m
split0:/hello/a.txt,0-128m--------maptask1
split1: /hello/a.txt,128m-200m--------maptask2
split2: /hello/b.txt,0m-100m--------maptask3
在mr中,所有的資料必須封裝為key-value
maptask1,2,3都會初始化乙個inputformat(預設textinputformat),每個inputformat物件負責建立乙個recordreader(linerecordreader)物件,
recordreader負責從每個切片的資料中讀取資料,封裝為key-value
linerecordreader
: 將檔案中的每一行封裝為乙個key(offset)-value(當前行的內容)
舉例:hello,hi,hadoop----->(0,hello,hi,hadoop)
hive,hadoop,hive----->(20,hive,hadoop,hive)
zoo,spark,wow----->(30,zoo,spark,wow)
zoo,spark,wow----->(40,zoo,spark,wow)
map()是map階段的核心處理邏輯! 單詞統計! map()會迴圈呼叫,對輸入的每個key-value都進行處理!
輸入:(0,hello,hi,hadoop)
輸出:(hello,1),(hi,1),(hadoop,1)
輸入:(20,hive,hadoop,hive)
輸出:(hive,1),(hadoop,1),(hive,1)
輸入:(30,zoo,spark,wow)
輸出:(zoo,1),(spark,1),(wow,1)
輸入:(40,zoo,spark,wow)
輸出:(zoo,1),(spark,1),(wow,1)
maptask2:
0號區: …
1號區: …
maptask3:
0號區: (hadoop,1),(hello,1),(hi,1),
1號區: (spark,1),(wow,1),(zoo,1)
①因為需求是生成兩個結果檔案,所以我們需要啟動兩個reducetask
reducetask啟動後,會啟動shuffle
執行緒,從maptask中拷貝相應分割槽的資料!
reducetask1
: 只負責0號區
將三個maptask,生成的0號區資料全部拷貝到reducetask所在的機器!
(hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)
(hadoop,1),(hello,1),(hi,1),
reducetask2
: 只負責1號區
將三個maptask,生成的1號區資料全部拷貝到reducetask所在的機器!
(spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)
(spark,1),(wow,1),(zoo,1)
②sort
reducetask1
: 只負責0號區進行排序:
(hadoop,1),(hadoop,1),(hadoop,1),(hello,1),(hello,1),(hi,1),(hi,1),(hive,1),(hive,1)
reducetask2
: 只負責1號區進行排序:
(spark,1),(spark,1),(spark,1),(wow,1) ,(wow,1),(wow,1),(zoo,1),(zoo,1)(zoo,1)
③reduce
reducetask1---->reducer----->reduce(一次讀入一組資料)
何為一組資料: key相同的為一組資料
輸入: (hadoop,1),(hadoop,1),(hadoop,1)
輸出: (hadoop,3)
輸入: (hello,1),(hello,1)
輸出: (hello,2)
輸入: (hi,1),(hi,1)
輸出: (hi,2)
輸入:(hive,1),(hive,1)
輸出: (hive,2)
reducetask2---->reducer----->reduce(一次讀入一組資料)
輸入: (spark,1),(spark,1),(spark,1)
輸出: (spark,3)
輸入: (wow,1) ,(wow,1),(wow,1)
輸出: (wow,3)
輸入:(zoo,1),(zoo,1)(zoo,1)
輸出: (zoo,3)
④呼叫outputformat中的recordwriter將reducer輸出的記錄寫出
reducetask1---->outputformat(預設textoutputformat)---->recordwriter(linerecorewriter)
linerecorewriter
將乙個key-value
以一行寫出,key和alue之間使用\t
分割
在輸出目錄中,生成檔案part-r-0000
hadoop 3
hello 2
hi 2
hive 2
reducetask2---->outputformat(預設textoutputformat)------>recordwriter(linerecorewriter)
linerecorewriter將
乙個key-value
以一行寫出,key和alue之間使用\t
分割
在輸出目錄中,生成檔案part-r-0001
spark 3
wow 3
zoo 3
reduce
階段(reducetask): 拷貝資料(copy)------排序(sort)-----合併(reduce)-----寫出(write)
MapReduce執行流程
mapreduce的大體流程是這樣的,如圖所示 由可以看到mapreduce執行下來主要包含這樣幾個步驟 1.首先對輸入資料來源進行切片 2.master排程worker執行map任務 3.worker讀取輸入源片段 4.worker執行map任務,將任務輸出儲存在本地 5.master排程work...
MapReduce執行流程
1.客戶端提交作業給yarn集群,rm接受客戶端所提交的作業。2.rm根據作業所要處理的檔案來決定map任務在哪些節點上執行,然後確定reduce任務在哪些節點 nn 上執行。3.rm分配map任務和reduce任務到相應的節點上。4.map任務開始執行,將執行結果臨時儲存到本地 執行過map任務的...
MapReduce執行流程
mapreducer工作流程圖 mapreducer工作流程 reducer shuffle start 6.reducer shuffle啟動後會到不同的map結果檔案中拉取相同區號的結果檔案,再合併這些來自不同map的結果檔案,再將這些檔案合併 歸併演算法 產生的大檔案是分割槽且排序且分好組了的...