通過提交jar包,進行mapreduce處理,那麼整個執行過程分為五個環節:
1、向client端提交mapreduce job.
2、隨後yarn的resourcemanager進行資源的分配.
3、由nodemanager進行載入與監控containers.
5、通過hdfs進行job配置檔案、jar包的各節點分發。
job 提交過程
job的提交通過呼叫submit()方法建立乙個jobsubmitter例項,並呼叫submitjobinternal()方法。整個job的執行過程如下:
2、檢查output的路徑是否正確,是否已經被建立。
3、計算input的splits。
4、拷貝執行job 需要的jar包、配置檔案以及計算input的split 到各個節點。
job 初始化過程
task 任務分配
2、執行tasks的是需要消耗記憶體與cpu資源的,預設情況下,map和reduce的task資源分配為1024mb與乙個核,(可修改執行的最小與最大引數配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)
task 任務執行
2、yarnchild執行於乙個專屬的jvm中,所以任何乙個map或reduce任務出現問題,都不會影響整個nodemanager的crash或者hang。
3、每個task都可以在相同的jvm task中完成,隨之將完成的處理資料寫入臨時檔案中。
mapreduce資料流
執行進度與狀態更新
1、mapreduce是乙個較長執行時間的批處理過程,可以是一小時、幾小時甚至幾天,那麼job的執行狀態監控就非常重要。每個job以及每個task都有乙個包含job(running,successfully completed,failed)的狀態,以及value的計數器,狀態資訊及描述資訊(描述資訊一般都是在**中加的列印資訊),那麼,這些資訊是如何與客戶端進行通訊的呢?
job 完成
shuffle與sort
從map到reduce的過程,被稱之為shuffle過程,mapreduce使到reduce的資料一定是經過key的排序的,那麼shuffle是如何運作的呢?
當map任務將資料output時,不僅僅是將結果輸出到磁碟,它是將其寫入記憶體緩衝區域,並進行一些預分類。
首先map任務的output過程是乙個環狀的記憶體緩衝區,緩衝區的大小預設為100mb(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體的大小到達一定比例,預設為80%(可通過mapreduce.map.sort.spill.percent配置項修改),便開始寫入磁碟。
在寫入磁碟之前,執行緒將會指定資料寫入與reduce相應的patitions中,最終傳送給reduce.在每個partition中,後台執行緒將會在記憶體中進行key的排序,(如果**中有combiner方法,則會在output時就進行sort排序,這裡,如果只有少於3個寫入磁碟的檔案,combiner將會在outputfile前啟動,如果只有乙個或兩個,那麼將不會呼叫)
這裡將map輸出的結果進行壓縮會大大減少磁碟io與網路傳輸的開銷(配置引數mapreduce.map .output.compress 設定為true,如果使用第三方壓縮jar,可通過mapreduce.map.output.compress.codec進行設定)
隨後這些paritions輸出檔案將會通過http傳送至reducers,傳送的最大啟動執行緒通過mapreduce.shuffle.max.threads進行配置。
2、the reduce side
首先上面每個節點的map都將結果寫入了本地磁碟中,現在reduce需要將map的結果通過集群拉取過來,這裡要注意的是,需要等到所有map任務結束後,reduce才會對map的結果進行拷貝,由於reduce函式有少數幾個複製執行緒,以至於它可以同時拉取多個map的輸出結果。預設的為5個執行緒(可通過修改配置mapreduce.reduce.shuffle.parallelcopies來修改其個數)
這裡有個問題,那麼reducers怎麼知道從哪些機器拉取資料呢?
map的結果將會被拷貝到reduce task的jvm的記憶體中(記憶體大小可在mapreduce.reduce.shuffle.input.buffer.percent中設定)如果不夠用,則會寫入磁碟。當記憶體緩衝區的大小到達一定比例時(可通過mapreduce.reduce.shuffle.merge.percent設定)或map的輸出結果檔案過多時(可通過配置mapreduce.reduce.merge.inmen.threshold),將會除法合併(merged)隨之寫入磁碟。
這時要注意,所有的map結果這時都是被壓縮過的,需要先在記憶體中進行解壓縮,以便後續合併它們。(合併最終檔案的數量可通過mapreduce.task.io.sort.factor進行配置) 最終reduce進行運算進行輸出。
**
git hub 提交過程
git init 初始化本地倉庫 git add 將 放到提交區 git commit m commit 提交到要地倉庫,並寫一些注釋 m 代表注釋 git remote add origin git github.com shengxuanyan xiaofufu.git 連線遠端倉庫並建了乙個名...
Spark on Yarn客戶端作業提交過程分析
我們將以乙個spark streaming為例,閱讀spark相關原始碼,簡述spark on yarn客戶端模式下作業提交流程。作業是通過spark submit指令碼提交的,因此整個流程從spark submit 開始分析。若有錯誤,希望各位看官指出。通過submit獲取提交 的mainclas...
spark提交過程分析(standalone模式)
2.1.所有executor都反向註冊到driver上之後,driver結束sparkcontext初始化,會繼續執行我們編寫的 2.2.每執行乙個action就會建立乙個job,job會提交給dagscheduler 2.3 dagscheduler會採用自己的stage劃分演算法將job劃分為多...