1. 從job提交流程的(2)--><9> 進去
job job = new job(jobid.downgrade(jobid), jobsubmitdir); 構造真正執行的job , localjobrunnber$job
2. localjobrunnber$job 的run()方法
1) tasksplitmetainfo tasksplitmetainfos =
splitmetainforeader.readsplitmetainfo(jobid, localfs, conf, systemjobdir);
// 讀取job.splitmetainfo
2) int numreducetasks = job.getnumreducetasks(); // 獲取reducetask個數
3) listmaprunnables = getmaptaskrunnables(
tasksplitmetainfos, jobid, mapoutputfiles);
// 根據切片的個數, 建立執行maptask的 maptaskrunnable
4) executorservice mapservice = createmapexecutor(); // 建立執行緒池
5) runtasks(maprunnables, mapservice, "map"); //執行 maptaskrunnable
6) 因為runnable提交給執行緒池執行,接下來會執行maptaskrunnable的run方法。
7) 執行 localjobrunner$job$maptaskrunnable 的run()方法.
(1) maptask map = new maptask(systemjobfile.tostring(), mapid, taskid,
info.getsplitindex(), 1); //建立maptask物件
(2) map.run(localconf, job.this); //執行maptask中的run方法
① org.apache.hadoop.mapreduce.taskattemptcontext taskcontext = jobcontextimpl
③ org.apache.hadoop.mapreduce.inputformatinputformat = textinputformat
④ split = getsplitdetails(new path(splitindex.getsplitlocation()),
splitindex.getstartoffset()); // 重構切片物件
切片物件的資訊 : file:/d:/input/inputword/janeeyre.txt:0+36306679
⑤ org.apache.hadoop.mapreduce.recordreaderinput = maptask$nettrackingrecordreader
⑥ output = new newoutputcollector(taskcontext, job, umbilical, reporter); //構造緩衝區物件
[1] collector = createsortingcollector(job, reporter); //獲取緩衝區物件
maptask$mapoutputbuffer
. collector.init(context); //初始化緩衝區物件
1>>.final float spillper =
job.getfloat(jobcontext.map_sort_spill_percent, (float)0.8);
// 溢寫百分比 0.8
2>>.final int sortmb = job.getint(mrjobconfig.io_sort_mb,
mrjobconfig.default_io_sort_mb);
// 緩衝區大小 100m
3>>.sorter = reflectionutils.newinstance(job.getclass(
mrjobconfig.map_sort_class, quicksort.class,
indexedsorter.class), job);
// 排序物件
// 排序使用的是快排,並且基於索引排序。
4>> . // k/v serialization // kv序列化
5>> . // output counters // 計數器
6>> . // compression // 壓縮
7>> . // combiner // combiner
map(context.getcurrentkey(), context.getcurrentvalue(), context);
context.write(outk,outv);
MapTask工作機制
階段 maptask 通過使用者編寫的 recordreader 從輸入 inputsplit 中解析出乙個個 key value。2 map 階段 該節點主要是將解析出的 key value 交給使用者編寫 map 函式處理,並產生一系列新的 key value。3 collect 收集階段 在使...
MapTask工作機制
1 並行度決定機制 1 問題引出 maptask的並行度決定map階段的任務處理併發度,進而影響到整個job的處理速度。那麼,maptask並行任務是否越多越好呢?這種說法是不對的,maptask多的話,消耗的資源就多 cpu,記憶體等等 maptask少的話,執行效率就會變低。2 maptask並...
MapTask工作機制
maptask並行度決定map階段的任務處理併發度,進而影響job的處理速度 maptask 並行度決定機制 乙個job的map階段並行度 個數 由客戶端提交job時的切片個數決定 乙個job的map階段並行度由客戶端在提交job時決定 每乙個split切片分配乙個maptask 預設 切片大小 b...