分布式計算追求:沒有計算發生。
支撐了計算向資料移動,和計算的並行度。
做的最主要的是:儲存與計算解耦,就是對所要進行切片的資料進行split切片,(split == map並行度
)。
split預設是與block塊數量一致,目的是為了計算向資料移動
,幾個block塊分布在幾個地方,就起幾個map,這樣就不需要讓大量資料進行移動,而是只需要將jar包分發到各個block所在的結點進行執行即可。
使用者也可以對split進行自定義:
當split大於block時,將會有部分資料向計算移動,但從原來的單機map,會將所有資料拉取到乙個結點上,考慮來說,這其實也是計算向資料移動。
當split小於block時,一般用於cpu密集型計算,也會有部分資料向計算移動。
即藍色部分為其中第二行中的world單詞被切割了,但是在進行block01
內容,紅色部分為block02
內容。hello world!!!
hello wor
ld!!!
hello world!!!
mapreduce-wordcount
計算時,hadoop框架仍然能返回乙個正確的值給我們,hadoop框架是如何實現的呢?
閱讀原碼可以發現。mapreduce
在使用linerecordreader
時,會判斷,如果當前開始的檔案偏移量不為0時(即不是第乙個切片檔案時),會讀一行空行,即將偏移量移至block的第二行開頭。即示例中,偏移到全文的第三行開頭處,然後在map執行時,框架會將第二個block中沒有被處理的資料移動向第乙個block所處的位置進行計算,這樣就可以解決單詞被block切分開的問題了。
input -> map -> outputnextkeyvalue():
讀取資料中的一條key,value
資料
返回boolean值
getcurrentkey()
getcurrentvalue()
output
:
寫出是通過writer寫出(k, v, p)的格式到快取區(mapoutputbuffer
)中。
mapoutputbuffer
:
init() :
spiller: 0.8 緩衝區溢寫百分比,預設為80%sortmb:100 緩衝區大小
sorter:quicksort 排序演算法
comparator:job.getoutputkeycomparator()
combiner: 用於提前合併相同的key的k-v資料。
minspillsforcombine = 3
spillthread
sortandspill() 主要用於做排序並溢血。
並判斷是否需要合併,若需要合併,則進行。
input -> reduce -> outputriter - input : 會到所有的map端產生的檔案,拉取對應的資料到reduce端,並經過sort歸併排序成乙個大檔案,並將其封裝成乙個迭代器。
而在reduce()方法執行時,也是通過傳遞乙個「假」迭代器values,來將同乙個key的資料載入進行的。
values迭代器通過hasnext()方法判斷是否還有相同key的資料,其實就是通過nextkeyissame
值(下乙個值的key是否與當前key相等)來判斷,當前同乙個key為一組的資料是否還有下一條資料。
而通過next()方法則可以間接呼叫riter的nextkeyvalue()拉取出一條資料,通過再多讀一條資料,判斷下乙個資料是否是與當前資料的key相同,更新nextkeyissame
值。
分組排序器:將大檔案中k-v資料,分成一組進行reduce操作,若使用者沒有指定,預設使用key分組進行。
避免oom:大資料檔案很大,不能全部載入到記憶體中。
減少io成本:同一組中可能會有多個不同的key,這些不同的key需要進入不同的reduce()方法中,只需要真迭代器從頭開始讀,而每次當需要處理新的一組key資料時,只需要通過假迭代器配合真迭代器去讀出資料即可完成reducetask操作,整個reducetask只需要真迭代器進行的一次i/o,假迭代器進行間接呼叫。
MapReduce流程講解以及原始碼分析
對於使用者來說只需要書寫map操作和reduce操作 mapreduce計算資料的時間較長 整個過程分為map和reduce,map負責處理原始資料,reduce負責處理map資料 1.map過程 block 塊 物理上的概念,預設是128m split 切片 本次map任務要處理的資料的大小 預設...
MapReduce原始碼 二
下圖是context類的繼承關係 不同層次的類的互動物件有所不同 mapcontext 關注 recordreaderreader 和 inputsplit split taskinputoutputcontext關注 recordwriteroutput statusreporter report...
從原始碼分析MapReduce的資料切片原理
切片過程從jobsubmitter的writesplits 開始 step into getsplits 開始獲取新的切片 首先會選出format預設最小值和配置檔案中設定的切片最小值 二者中最大的乙個 getformatmainsplitsize 預設返回值為1 getminsplitsize 中...