MapReduce過程詳解

2021-10-19 23:27:21 字數 2983 閱讀 8263

資料執行的底層目前還是以hadoop為主,我們主要接觸的還是上層抽象出來的比較方便的資料工具,例如hive,spark,impala等等,那麼hadoop底層的核心原理又是什麼呢?

hadoop的底層核心由hdfs,mapreduce和yarn組成,hdfs是大資料的儲存引擎,分布式檔案系統,yarn是資源排程系統,而mapreduce就是它的計算框架,但同時,mapreduce也是乙個程式設計模型,因為mapreduce抽象出來的框架,開發人員只需要按照其規則編寫**,就可以直接放到hadoop上去執行,也就是說開發人員可以不用關心hadoop的實現原理,呼叫原理,只需要關注業務邏輯即可,大大降低了大資料開發的門檻。

一、mapreduce是如何完成資料計算的呢?其原理如下:

mapreduce模型只包含兩個過程,其一是map,其二是reduce,

map 的主要輸入是一對 值,經過 map 計算後輸出一對 值,然後shuffle將相同 key 合併,形成 ,然後再將這個 輸入reduce,經過計算輸出零個或多個 對。

大資料領域的關係代數運算,矩陣運算,都可通過mapreduce進行。

我們來拆解一下mapreduce的過程,以wordcount為例:

wordcount解決的是把乙個文字經過處理,統計出文字中每個詞出現的頻率問題。

那麼我們正常python**實現如下:

string='''hello abc

hello hadoop

hello 123

123'''

string_list=string.replace("\n"," ").lower().split(" ")

count_dict = {}

for string in string_list:

if string in count_dict.keys():

count_dict[string] = count_dict[string]+1

else:

count_dict[string]=1

print(count_dict)

輸出結果為:

程式設計框架裡,map過程是乙個map函式完成的,reduce過程是乙個reduce函式完成。map函式和reduce函式的輸入都是乙個key,value對,輸出也都是乙個key,value對。

wordcount的mapreduce實現如下:

public class wordcount 

}}public static class intsumreducer extends reducer

result.set(sum);

context.write(key, result);}}}

詳解過程如下:

一般情況下,map過程輸入的value就是我們的輸入的一行資料,而key在計算中是用不到的,所以實現的時候可以隨便傳值,而wordcount的實現是用偏移量給它傳值的。即行首在文件中的索引位置。

傳入keyvalue對以後,map程式進行處理,處理過程是把每一行資料拆成單個的詞,並且每個詞作為輸出的key,而輸出的value,都是1,即此時讓每個詞出現的頻率都是1。輸出很多組key,value對。上述例子中的字串經過map以後,就會變成如下樣式:

然後經過乙個叫做shuffle的過程,shuffle本質是進行資料分割槽,放到wordcount裡面表現出來的就是相似的值合併,shuffle是reduce程序處理的一部分,所以可能有些地方不會把這個過程單獨拉出來,而業務**中也不會有體現。

map 輸出的 shuffle 到哪個 reduce 程序是這裡的關鍵,它是由 partitioner 來實現的,mapreduce 框架預設的 partitioner 用 key 的雜湊值對 reduce 任務數量取模,相同的 key 一定會落在相同的 reduce 任務 id 上。從實現上來看的話,這樣的 partitioner **只需要一行。

public int getpartition(k2 key, v2 value, int numreducetasks)
經過shuffle以後,map的輸出就會變成如下樣式:

4.然後把這些鍵值對傳給reduce,作為reduce的入參,經過reduce處理,reduce就對value值進行sum,然後把輸入的key繼續作為key,而sum(value)的值作為輸出的value,就輸出了最終結果:

處理過程用圖表示,如下圖:

二、mapreduce是怎麼執行的呢?

執行過程如下:

應用程序 jobclient 將使用者作業 jar 包儲存在 hdfs 中,將來這些 jar 包會分發給 hadoop 集群中的伺服器執行 mapreduce 計算。

應用程式提交 job 作業給 jobtracker。

jobtracker 根據作業排程策略建立 jobinprocess 樹,每個作業都會有乙個自己的 jobinprocess 樹。

jobinprocess 根據輸入資料分片數目(通常情況就是資料塊的數目)和設定的 reduce 數目建立相應數量的 taskinprocess。

tasktracker 程序和 jobtracker 程序進行定時通訊。

如果 tasktracker 有空閒的計算資源(有空閒 cpu 核心),jobtracker 就會給它分配任務。分配任務的時候會根據 tasktracker 的伺服器名字匹配在同一臺機器上的資料塊計算任務給它,使啟動的計算任務正好處理本機上的資料。

tasktracker 收到任務後根據任務型別(是 map 還是 reduce)和任務引數(作業 jar 包路徑、輸入資料檔案路徑、要處理的資料在檔案中的起始位置和偏移量、資料塊多個備份的 datanode 主機名等),啟動相應的 map 或者 reduce 程序。

如果是 map 程序,從 hdfs 讀取資料(通常要讀取的資料塊正好儲存在本機);如果是 reduce 程序,將結果資料寫出到 hdfs。

MapReduce過程詳解

1.輸入分片 input split 在進行map計算之前,mapreduce會根據輸入檔案計算輸入分片 input split 每個輸入分片 input split 針對乙個map任務。2.map階段 就是我們寫的map函式,map函式效率相對好控制,而且一般map操作都是本地化操作也就是在資料儲...

詳解MapReduce過程

textinputformat的原始碼注釋為 檢視inputformat介面的原始碼注釋我們了解到這個介面的作用為 在inputformat的源 中有如下兩個方法 inputsplit getsplits jobconf job,int numsplits throws ioexception 獲取...

MapReduce 過程詳解

1 最簡單的過程 map reduce map partition reduce 3 增加了在本地先進性一次reduce 本地優化 減少後期網路的傳輸量 map combine 本地reduce partition reduce 一般說來,乙個完整的mapreduce過程可以分為以上3中提到的4個步...