MapReduce 分布式計算框架

2021-10-09 16:07:03 字數 3938 閱讀 8934

mapreduce是分布式計算框架,它將大型資料操作作業分解為可以跨伺服器集群並行執行的單個任務,適用於大規模資料處理場景,每個job包含map和reduce兩部分

分而治之:簡化平行計算的程式設計模型

構建抽象模型:map和reduce

隱藏系統層細節:開發人員專注於業務邏輯實現

優點:

缺點:@override

protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception

}} reduce

public class wcreduce extends reducer

context.write(key,new intwritable(total));}}

driver

public class wcdriver ;

fileinputformat.setinputpaths(job,new path(path[0]));

fileoutputformat.setoutputpath(job, new path(path[1]));

boolean result =job.waitforcompletion(true);

system.out.println(result?"成功":"失敗");

system.exit(result?0:1);}}

mapreduce程式設計總結inputformat介面

常用實現類為:

切塊(block):hdfs上的物理切割,是乙個物理概念,通常情況下,切塊的個數等於切片的個數

block和split的區別:

①block是資料的物理表示

②split是塊中資料的邏輯表示

③split劃分是在記錄的邊界處

④split的數量應不大於block的數量(一般相等)

①combiner相當於本地化的reduce操作

②在shuffle之前進行本地聚合

③用於效能優化,可選項

④輸入和輸出型別一致

⑤reduce可以被用作conbiner的條件

⑥符合交換律和結合律

⑦實現conbiner類需要繼承rreduce類,

⑧driver類中設定job.setconbinerclass()

①用於在map端對key進行分割槽

②預設使用的是hashpartitioner ,取key的雜湊值,使用key的雜湊值對reduce任務數求模,決定每調記錄應該送到那個reducer處理

③自定義partitioner,需要繼承抽象類partitioner,重寫getpartition方法

④driver類中設定job.setpartitioerclass()

⑤當設定的分割槽數與partitioner類不匹配時

常用實現類

input split 即輸入分片,資料在進行 map 計算之前,mapreduce 會根據輸入檔案進行切分,因為我們需要分布式的進行計算嘛,那麼我得計算出來我的資料要切成多少片,然後才好去對每片資料分配任務去處理。

每個輸入分片會對應乙個 map 任務,輸入分片儲存的並非資料本身,而是乙個分片長度和乙個記錄資料的位置資料,它往往是和 hdfs 的 block(塊) 進行關聯的。

假如我們設定每個 hdfs 的塊大小是為預設的 128m,如果我們現在有3個檔案,大小分別是 10m,129m,200m,那麼mapreduce 對把 10m 的檔案分為乙個分片,129m 的資料檔案分為2個分片,200m 的檔案也是分為兩個分片。那麼此時我們就有 5 個分片,就需要5個 map 任務去處理,而且資料還是不均勻的。

如果有非常多的小檔案,那麼就會產生大量的 map 任務,處理效率是非常低下的。這個階段使用的是 inputformat 元件,它是乙個介面 ,預設使用的是 textinputformat 去處理,他會呼叫 readrecord() 去讀取資料。

小檔案處理是mapreduce 計算優化的乙個非常重要的乙個點。可以通過如下方法:

①最好的辦法:在資料處理系統的最前端(預處理、採集),就將小檔案先進行合併了,再傳到 hdfs 中去。

②補救措施:如果已經存在大量的小檔案在hdfs中了,可以使用另一種 inputformat 元件combinefileinputformat 去解決,它的切片方式跟 textinputformat 不同,它會將多個小檔案從邏輯上規劃到乙個切片中,這樣,多個小檔案就可以交給乙個 map 任務去處理了。

將 map 階段的輸出作為 reduce 階段的輸入的過程就是 shuffle 。 這也是整個 mapreduce 中最重要的乙個環節。

一般mapreduce 處理的都是海量資料,map 輸出的資料不可能把所有的資料都放在記憶體中,當我們在map 函式中呼叫 context.write() 方法的時候,就會呼叫 outputcollector 元件把資料寫入到處於記憶體中的乙個叫環形緩衝區的東西。

環形緩衝區預設大小是 100m ,但是只寫80%,同時map還會為輸出操作啟動乙個守護執行緒,當到資料達到80%的時候,守護執行緒開始清理資料,把資料寫到磁碟上,這個過程叫 spill 。

資料在寫入環形緩衝區的時候,資料會預設根據key 進行排序,每個分割槽的資料是有順序的,預設是 hashpartitioner。當然了,我們也可以去自定義這個分割槽器。

每次執行清理都產生乙個檔案,當 map 執行完成以後,還會有乙個合併檔案檔案的過程,其實他這裡跟 map 階段的輸入分片(input split)比較相似,乙個 partitioner 對應乙個 reduce 作業,如果只有乙個 reduce 操作,那麼 partitioner 就只有乙個,如果有多個 reduce 操作,那麼 partitioner 就有多個。partitioner 的數量是根據 key 的值和 reduce 的數量來決定的。可以通過 job.setnumreducetasks() 來設定。

這裡還有乙個可選的元件 combiner ,溢位資料的時候如果呼叫 combiner 元件,它的邏輯跟 reduce 一樣,相同的key 先把 value 進行相加,前提是合併並不會改變業務,這樣就不糊一下傳輸很多相同的key 的資料,從而提公升效率。

舉個例子,在溢位資料的時候,預設不使用 combiner,資料是長這樣子: ,,。 當使用 combiner 元件時,資料則是: ,。把 a 的資料進行了合併。

在執行 reduce 之前,reduce 任務會去把自己負責分割槽的資料拉取到本地,還會進行一次歸併排序並進行合併。

reduce 階段中的 reduce 方法,也是我們自己實現的邏輯,跟map 階段的 map 方法一樣,只是在執行 reduce 函式的時候,values 為 同一組 key 的value 迭代器。在 wordcount 的例子中,我們迭代這些資料進行疊加。最後呼叫 context.write 函式,把單詞和總數進行輸出。

在 reduce 函式中呼叫 context.write 函式時,會呼叫 outputfomart 元件,預設實現是 textoutputformat ,把資料輸出到目標儲存中,一般是 hdfs。

面試題:

mapreduce執行流程

mapreduce shuffle過程

maptask個數如何調整

分布式計算框架MapReduce

mapreduce思想在生活中處處可見。或多或少都曾接觸過這種思想。mapreduce的思想核心是 分而治之 適用於大量複雜的任務處理場景 大規模資料處理場景 map負責 分 即把複雜的任務分解為若干個 簡單的任務 來並行處理。可以進行拆分的前提是這些小任務可以平行計算,可以提高並行度。彼此間幾乎沒...

分布式計算模型MapReduce

1 需求 統計乙個檔案中每乙個單詞出現的總次數。2 案例資料 3 編寫reducer類 public class wordcountreducer extends reducer v.set sum context.write key,v 4 編寫驅動類 public class wordcount...

分布式平行計算MapReduce

1.用自己的話闡明hadoop平台上hdfs和mapreduce的功能 工作原理和工作過程。hdfs 1 第一次啟動 namenode 格式化後,建立 fsimage 和 edits 檔案。如果不是第一次啟動,直接載入編輯日誌和映象檔案到記憶體。2 客戶端對元資料進行增刪改的請求。3 namenod...