MapReduce的執行流程概述

2021-10-08 01:36:22 字數 4044 閱讀 8085

inputformat呼叫recordreader,從輸入目錄的檔案中,讀取一組資料,封裝為keyin-valuein物件

④outputformat呼叫recordwriter,將reducer處理後的keyout-valueout寫出到檔案

關於這些名詞的解釋參考我之前的文章mapreduce計算框架的核心程式設計思想。

需求:統計/hello目錄中每個檔案的單詞數量,

a-p開頭的單詞放入到乙個結果檔案中,

q-z開頭的單詞放入到另外乙個結果檔案中。

例如:/hello/a.txt,檔案大小200m

hello,hi,hadoop

hive,hadoop,hive,

zoo,spark,wow

zoo,spark,wow

/hello/b.txt,檔案大小100m

hello,hi,hadoop

zoo,spark,wow

…①切片(切分資料)

/hello/a.txt 200m

/hello/b.txt 100m

預設的切分策略是以檔案為單位,以檔案的塊大小(128m)為片大小進行切片!

split0:/hello/a.txt,0-128m

split1: /hello/a.txt,128m-200m

split2: /hello/b.txt,0m-100m

split0:/hello/a.txt,0-128m--------maptask1

split1: /hello/a.txt,128m-200m--------maptask2

split2: /hello/b.txt,0m-100m--------maptask3

在mr中,所有的資料必須封裝為key-value

maptask1,2,3都會初始化乙個inputformat(預設textinputformat),每個inputformat物件負責建立乙個recordreader(linerecordreader)物件,

recordreader負責從每個切片的資料中讀取資料,封裝為key-value

linerecordreader: 將檔案中的每一行封裝為乙個key(offset)-value(當前行的內容)

舉例:hello,hi,hadoop----->(0,hello,hi,hadoop)

hive,hadoop,hive----->(20,hive,hadoop,hive)

zoo,spark,wow----->(30,zoo,spark,wow)

zoo,spark,wow----->(40,zoo,spark,wow)

map()是map階段的核心處理邏輯! 單詞統計! map()會迴圈呼叫,對輸入的每個key-value都進行處理!

輸入:(0,hello,hi,hadoop)

輸出:(hello,1),(hi,1),(hadoop,1)

輸入:(20,hive,hadoop,hive)

輸出:(hive,1),(hadoop,1),(hive,1)

輸入:(30,zoo,spark,wow)

輸出:(zoo,1),(spark,1),(wow,1)

輸入:(40,zoo,spark,wow)

輸出:(zoo,1),(spark,1),(wow,1)

maptask2:

0號區: …

1號區: …

maptask3:

0號區: (hadoop,1),(hello,1),(hi,1),

1號區: (spark,1),(wow,1),(zoo,1)

①因為需求是生成兩個結果檔案,所以我們需要啟動兩個reducetask

reducetask啟動後,會啟動shuffle執行緒,從maptask中拷貝相應分割槽的資料!

reducetask1: 只負責0號區

將三個maptask,生成的0號區資料全部拷貝到reducetask所在的機器!

(hadoop,1),(hadoop,1),(hello,1),(hi,1),(hive,1),(hive,1)

(hadoop,1),(hello,1),(hi,1),

reducetask2: 只負責1號區

將三個maptask,生成的1號區資料全部拷貝到reducetask所在的機器!

(spark,1),(spark,1),(wow,1) ,(wow,1),(zoo,1)(zoo,1)

(spark,1),(wow,1),(zoo,1)

②sort

reducetask1: 只負責0號區進行排序:

(hadoop,1),(hadoop,1),(hadoop,1),(hello,1),(hello,1),(hi,1),(hi,1),(hive,1),(hive,1)

reducetask2: 只負責1號區進行排序:

(spark,1),(spark,1),(spark,1),(wow,1) ,(wow,1),(wow,1),(zoo,1),(zoo,1)(zoo,1)

③reduce

reducetask1---->reducer----->reduce(一次讀入一組資料)

何為一組資料: key相同的為一組資料

輸入: (hadoop,1),(hadoop,1),(hadoop,1)

輸出: (hadoop,3)

輸入: (hello,1),(hello,1)

輸出: (hello,2)

輸入: (hi,1),(hi,1)

輸出: (hi,2)

輸入:(hive,1),(hive,1)

輸出: (hive,2)

reducetask2---->reducer----->reduce(一次讀入一組資料)

輸入: (spark,1),(spark,1),(spark,1)

輸出: (spark,3)

輸入: (wow,1) ,(wow,1),(wow,1)

輸出: (wow,3)

輸入:(zoo,1),(zoo,1)(zoo,1)

輸出: (zoo,3)

④呼叫outputformat中的recordwriter將reducer輸出的記錄寫出

reducetask1---->outputformat(預設textoutputformat)---->recordwriter(linerecorewriter)

linerecorewriter將乙個key-value以一行寫出,key和alue之間使用\t分割

在輸出目錄中,生成檔案part-r-0000

hadoop 3

hello 2

hi 2

hive 2

reducetask2---->outputformat(預設textoutputformat)------>recordwriter(linerecorewriter)

linerecorewriter將乙個key-value以一行寫出,key和alue之間使用\t分割

在輸出目錄中,生成檔案part-r-0001

spark 3

wow 3

zoo 3

reduce階段(reducetask): 拷貝資料(copy)------排序(sort)-----合併(reduce)-----寫出(write)

MapReduce執行流程

mapreduce的大體流程是這樣的,如圖所示 由可以看到mapreduce執行下來主要包含這樣幾個步驟 1.首先對輸入資料來源進行切片 2.master排程worker執行map任務 3.worker讀取輸入源片段 4.worker執行map任務,將任務輸出儲存在本地 5.master排程work...

MapReduce執行流程

1.客戶端提交作業給yarn集群,rm接受客戶端所提交的作業。2.rm根據作業所要處理的檔案來決定map任務在哪些節點上執行,然後確定reduce任務在哪些節點 nn 上執行。3.rm分配map任務和reduce任務到相應的節點上。4.map任務開始執行,將執行結果臨時儲存到本地 執行過map任務的...

MapReduce執行流程

mapreducer工作流程圖 mapreducer工作流程 reducer shuffle start 6.reducer shuffle啟動後會到不同的map結果檔案中拉取相同區號的結果檔案,再合併這些來自不同map的結果檔案,再將這些檔案合併 歸併演算法 產生的大檔案是分割槽且排序且分好組了的...