Hadoop學習 關於MapReduce

2021-10-05 06:20:35 字數 4451 閱讀 8728

mapreduce 是一種可用於資料處理的程式設計模型。mapreduce 任務過程分為兩個處理階段:map階段和reduce階段。每個階段都是以鍵-值對作為輸入和輸出。這些階段任務執行在集群上的節點上,並通過yarn進行排程,如果乙個任務失敗,它將在另乙個不同的節點上自動重新排程執行。mapreduce 程式本質上是並行執行的,因此可以將大規模的資料分析任務分發給任何乙個擁有足夠多機器的資料中心。mapreduce的優勢在於處理大規模資料集。

mapreduce 執行流程

hadoop 將mapreduce 的輸入資料劃分成等長的小資料塊,稱為輸入分片(input split)或簡稱「分片」。hadoop 為每個分片構建乙個map任務,並由該任務來執行使用者自定義的map函式從而處理分片中的每條記錄。擁有很多分片,意味著處理每個分片所需要的時間少於處理整個輸入資料所花的時間。因此,如果並行處理每個分片,且每個分片資料比較小,那麼整個處理過程將獲得更好的負載平衡,另一方面,如果分片切分的太小,那麼管理分片的總時間和構建map任務的總時間將決定作業的整個執行時間。對於大多數作業來說,乙個合理的分片大小趨向於hdfs的乙個塊的大小,預設是128mb,不過可以調整這個預設值。

split 的個數決定了map的個數,影響個數的因素主要有:1.hdfs塊的大小 2.檔案的大小 3.檔案的個數,小檔案會使map的個數增加 4。splitsize 大小,預設為block大小,可以通過引數進行修改調節

map 函式開始產生輸出時,並不是簡單的將它寫到磁碟,而是利用緩衝的方式寫到記憶體並出於效率考慮進行預排序。每個map任務都有乙個環形記憶體緩衝區用於儲存任務輸出。在預設情況下,緩衝區的大小為100mb,這個值可以通過改變mapreduce.task.io.sort.mb 屬性來調整。一旦緩衝內容達到閾值(mapreduce.map.sort.spill.percent,預設為0.8),乙個後台執行緒便開始吧內容溢位(spill)到磁碟。在溢位寫到磁碟過程中,map輸出繼續寫到緩衝區,但如果在此期間緩衝區被填滿,map會被阻塞直到寫磁碟過程完成。溢寫過程按輪詢方式將緩衝區中的內容寫到(mapreduce.cluster.local.dir 屬性)指定的目錄中。

hadoop 在儲存有輸入資料的節點上執行map任務,可以獲得最佳效能,因為它無需使用寶貴的集群頻寬資源(資料本地化優化)。但是,有時對於乙個map任務的輸入分片來說,儲存該分片的hdfs資料塊副本的所有節點可能正在執行其他map任務,此時作業排程需要從某一資料塊所在的機架中的乙個節點上尋找乙個空閒的map槽(slot)來執行該map任務分片。所以分片的的大小應該與塊大小相同,因為他是確保可以儲存在單個節點上的最大輸入塊的大小。如果分片跨越兩個資料塊,那麼對於任何乙個hdfs節點,基本上都不可能同時儲存這兩個資料塊,因此分片中的部分資料需要通過網路傳輸到map任務執行的節點,這種方法顯然效率更低。

map 任務將其輸出寫本地硬碟,而非hdfs。因為map的輸出是中間結果,該中間結果有reduce處理後才產生最終輸出結果,而且一旦作業完成,map的輸出結果就會刪除。如果執行map任務的節點在將map中間結果傳送給reduce任務之前失敗,hadoop將在另乙個節點上重新執行這個map任務以再次構建map中間結果。

在進入reduce前,mapreduce 保證每個reducer的輸入都已按key值排序,使得具有相同key的資料彼此相鄰。如果指定了合併操作(combiner),將具有相同key的資料進行聚合。系統執行排序過程的過程-map輸出傳到reducer作為後者的輸入-即為shuffle。

reduce 任務需要為其特定分割槽檔案從集群上若干個map任務的map任務。map任務可以在不同時間完成,因此只要有乙個任務結束,reduce任務就開始複製其輸出。reduce任務有少量複製執行緒,一次能夠並行的取得map輸出,預設是5個執行緒。

split 實際上每個split 包含後乙個block 中開頭部分的資料(解決記錄跨block問題)

recordreader 每讀取一條記錄,呼叫一次map函式

map 進行map處理

shuffle partiton 分割槽 sort 排序,spill 溢寫,merge 合併 ,combiner 合併 copy memory disk, 效能可優化的地方

partitoner 決定資料由哪個reduce 處理,從而分割槽 比如採用hash 法,hash 取模

memorybuffer:記憶體緩衝區,每個map 的結果和partition 處理key value的結果都儲存在快取中 預設大小 100m 溢寫閾值:100m * 0.8 =80m 緩衝區的數:partition key value 三元組資料

spill 記憶體緩衝區達到閾值時,溢寫spill執行緒鎖住這80m的緩衝區,開始將資料寫出到本地磁碟中,然後釋放記憶體,每一次溢寫都生成乙個資料檔案。溢寫出的資料到磁碟前會對資料進行key排序sort 以及合併combiner

sort 緩衝區資料按照key進行排序

combiner 資料合併,相同的key的資料,value值合併,減少輸出傳輸量 在不影響最終結果時,可以極大提公升效能,但例如求中值情況下就會影響結果

spill & sort :和map一樣,記憶體緩衝滿時,也通過sort和 combiner ,將資料溢寫到硬碟檔案中

reduce 多個reduce任務輸入的資料都屬於不同的partion ,因此結果資料的key不會重複,合併reduce 的輸出檔案即可得到最終的結果

python 寫 hadoop streaming作業,寫map和reduce統計文字中的wordcount,統計文字中每個單詞出現的次數。文字內的數字為word,出現的的次數為count

map 實現

import re

p = re.

compile

(r'\w+'

)data_path =

'd:\\workspace4python\\test.txt'

# 讀取test文字,用空格進行切分,過濾只留下單詞,然後輸出每個word,例word,1

with

open

(data_path,

'r',encoding=

'utf-8'

)as f:

for line in f.readlines():

word_st = line.strip(

).split(

" ")

for word in word_st:

word_re = p.findall(word);if

len(word_re)==0

:continue

word = word_re[0]

.lower(

)print

('%s,%s'

%(word,1)

)

reduce 實現

import sys

cur_word =

none

sum=

0#統計輸入每個單詞出現的次數 wordcount

for line in sys.stdin:

word,val = line.strip(

).split(

',')

if cur_word==

none

: cur_word = word

if cur_word!=word:

print

'%s\t%s'

%(cur_word,

sum)

cur_word = word

sum=0

sum+=

int(val)

print

'%s\t%s'

%(cur_word,

sum)

run 指令碼

hadoop_cmd=

"/usr/local/src/hadoop-2.6.1/bin/hadoop"

stream_jar_path=

"/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

input_file_path_1=

"/data/test.txt"

output_path=

"/output/mr"

$hadoop_cmd fs -rmr -skiptrash $output_path

$hadoop_cmd jar $stream_jar_path \

-input $input_file_path_1 \

-output $output_path \

-reducer "python red_t.py" \

-file ./map_t.py \

-file ./red_t.py

hadoop系統 分布式計算框架MapReduce

單機程式計算流程 輸入資料 讀取資料 處理資料 寫入資料 輸出資料 hadoop計算流程 input data 輸入資料 inputformat 對資料進行切分,格式化處理 map 將前面切分的資料做map處理 將資料進行分類,輸出 k,v 鍵值對資料 shuffle sort 將相同的資料放在一起...

Hadoop學習筆記 Hadoop初識

序言 資訊化發展到當今,網際網路的資料量是不斷地增加,那麼如何很好的處理以及利用這些資料可能是未來的乙個發展方向,這也之所以產生了各種平台的雲計算。對於網際網路而言,大資料量可分為兩種 第 一 大訪問量請求 第 二 大資料量處理。大訪問量請求這個事應用端應該思考的問題,如何很好的處理大的訪問量,如何...

Hadoop學習一 Hadoop版本

一.hadoop社群版和發行版 社群版 我們把apache社群一直開發的hadoop稱為社群版。簡單的說就是apache hadoop 發行版 基於apache hadoop的基礎上進行商業改造的解決方案,包含一系列定製的管理工具和軟體。二.hadoop社群版版本號 一直以來,hadoop的版本號一...