背景
mapreduce現在基本已經成為分布式並行程式設計框架的bible,很多分布式計算引擎的實現[hadoop][ciel][twister][transformer][mr-mpi][phoenix][dryad]都將mapreduce作為乙個核心的程式設計模型。
mapreduce程式設計模型是什麼?
很多人都認為mapreduce只有這樣兩個過程構成:
map過程:map(k1,v1)
→list(k2,v2)
reduce過程:reduce(k2, list (v2))
→list(v3)
會找到一些圖來輔助對於mapreduce的理解:
哪些是mapreduce過程的關鍵點?
1)map階段的資料本地性的考慮。
2)map中間結果的歸併,由多個spill檔案歸併成乙個大檔案。
3)reduce拷貝資料的過程
對於乙個具體的問題,更多的時候,確定map、reduce過程的操作並不困難,關鍵的問題是mapreduce對於資料的組織和傳輸問題。
1)map是資料本地性、並行化的關鍵步驟。對於乙個大檔案,它起始位置放在**,如何對於map階段劃分成小檔案,如何協調小檔案的節點內部分發和task排程的配合。下面簡單介紹幾個開源版本的做法:
[hadoop]的成功在於hdfs與mapreduce程式設計模型的配合與協作。hdfs按照block來儲存大檔案,每個datanode儲存一定數量的block,這些block的元資料資訊可以通過hdfs的api獲得。hadoop根據block的個數初始化map的個數(預設方式),taskscheduler根據block的位置,排程乙個map task,這裡啟用了data locality,會首先將maptask排程到block所在的tasktracker節點。當然,在hadoop集群高負載的情況下,data locality的命中率不高,berkeley在[eurosys2010 delay scheduling]提出在不滿足data locality的情況下,讓該task等待一會,然後再被排程,這是排程問題,和上面討論的問題還不屬於乙個層次。
[nsdi11 ciel]它的開源版本叫做skywriting,使用的python開發的。該程式語言的好處,開發速度快,對於使用者而言寫程式入門特別低。ciel是乙個基於**的動態dag分布式計算框架,根據作業執行的需求,不斷**出新的task,它對於輸入資料管理之快基本沒有做什麼工作,只是使用乙個全域性的名字空間,保證對於不同節點上檔案檔案訪問的統一介面。例子:
input_data = [ref("ciel://host137/chunk0"),
ref("ciel://host223/chunk1"),
...];
curr = ...; // initial guess at the result.
這種做法的問題在於,使用者需要組織輸入大資料的切分和遷移。在我理解,乙個完整的程式設計模型,必須包括對於要處理資料的組織,不然這塊工作交由上層使用者來維護,體驗不好。
言歸正傳:它是怎麼組織資料給每個map呢?
通過mmap將本地檔案全部匯入記憶體中,然後啟用多個pthread執行緒,預設是8個,將檔案對映的記憶體區域,計算maptask的個數,並且按照maptask的個數切分mmap對映的記憶體,分別交給pthread來處理。在記憶體充足的情況下,把資料全部放在記憶體裡處理,並且沒有map到reduce階段的網路io和磁碟io的開銷,效能還是很不錯的。phoenix的設計從開始就面向的效能卓越的伺服器,這與hadoop架構在普通伺服器或者commodity machine的思想是不同的。
為了寫一篇有營養的文章,我準備接下來在hadoop mapreduce程式設計模型上多說幾句。
hadoop如何實現大檔案與多個maptask的對映
首先,我來給大家提幾個問題?
1)mapreduce處理一般都是大檔案,那麼大檔案要切分成各個小檔案才能處理嗎?
2)mapreduce的過程在什麼階段處理了大檔案,對它生成了索引檔案了嗎?
3)maptask的個數與檔案的什麼引數相關,程式可以設定嗎?
這些問題在初期寫hadoop程式的時候,困擾了我很久,後來隨著**的深入理解,問題才乙個又乙個地揭開。
jobsubmiiter在將作業提交給jobtracker之前,回答了上面所說的三個問題。
兩類split檔案:
在hadoop中有兩類split檔案,一類是the split meta information, 另外一類是the raw split information, the split meta information被jobtracker使用來構建tasks的資料本地性的組織結構,另外乙個類是 the raw split information 指定每乙個maptask需要讀取資料的位置。這兩個類分別為org.apache.hadoop.mapreduce.split.jobsplit.tasksplitmetainfo,org.apache.hadoop.mapreduce.split.splitmetainfo
jobsubmiiter處理生成split檔案的函式:(有注釋)
private int writenewsplits(jobcontext job, path jobsubmitdir) throws ioexception,interruptedexception, classnotfoundexception
MapReduce程式設計模型
計算採用一組輸入鍵 值對,並產生一組輸出鍵 值對。mapreduce庫的使用者將計算表達為兩個函式 map和reduce。input1 map a,1 b,1 c,1 input2 map b,1 input3 map a,1 c,1 reduce c,2 reduce b,2 reduce a,2...
MapReduce 程式設計模型
mapreduce 簡介 mapreduce 本身是一種支援並行運算的程式設計模型 思想 這個程式設計模型分為兩個階段 map 階段和 reduce 階段。hadoop 的 mapreduce 框架 hadoop 的 mapreduce 是實現 mapreduce 程式設計模型的乙個分布式計算框架,...
MapReduce 程式設計模型
mapreduce 是一種簡化平行計算的程式設計模型,用於大資料量的計算。它的核心思想是 分散任務,彙總結果 將大規模資料集的操作分發給乙個主節點管理下的各個子節點共同完成,然後整合各個子節點的中間結果,從而得到最終結果。mapreduce的優點 1 便於程式設計 mapreduce 只需簡單地實現...