前言
前面分析了task scheduler劃分task的過程,task在task schedulerimpl類中被包裝成stagetasksets,然後由driverendpoint傳送,最後由coarsegrainedschedulerbackend序列化並啟動executor。現在開始分析executor執行任務的過程。正文
coarsegrainedschedulerbackend傳送一條lunchtask
訊息後(executordata.executorendpoint.send(launchtask(new serializablebuffer(serializedtask)))
),executor收到訊息,開始執行lunchtask方法。
case
launchtask
(data)
=>
if(executor == null)
else
executor.launchtask(this, taskdesc)
把任務詳情也傳入進去,然後啟動乙個taskrunner,並把taskdesc也傳入進去:val tr = new taskrunner(context, taskdescription)
,這個taskrunner會把task反序列化出來,並且這個run()方法會在獲取結果時執行task.run()方法。
task = ser.deserialize[task[any]](
taskdescription.serializedtask, thread.currentthread.getcontextclassloader)
task.localproperties = taskdescription.properties
task.
settaskmemorymanager
(taskmemorymanager)
## 執行task.
run(
)方法 val res = task.
run(
taskattemptid = taskid,
attemptnumber = taskdescription.attemptnumber,
metricssystem = env.metricssystem)
threwexception =
false
res
執行task.run()
方法,會執行runtask()
方法,這個runtask()方法沒有實現,因為task有兩類,一類是shufflemaptask,一類是resulttask。所以需要各自實現runtask()方法。我們先看一下shufflemaptask的runtask()方法。
override def runtask
(context: taskcontext)
: mapstatus =
else
0l val ser = sparkenv.get.closureserializer.
newinstance()
val (rdd, dep)
= ser.deserialize[
(rdd[_]
, shuffledependency[_, _, _])]
( bytebuffer.
wrap
(taskbinary.value)
, thread.currentthread.getcontextclassloader)
_executordeserializetime = system.
currenttimemillis()
- deserializestarttime
_executordeserializecputime =
if(threadmxbean.iscurrentthreadcputimesupported)
else
0l var writer: shufflewriter[any, any]
= null
trycatch
}catch
throw e
}}
val (rdd, dep) = ser.deserialize[(rdd[_], shuffledependency
這條**就是從反序列化的資料裡拿到stage的最後乙個rdd和它的依賴,並通過sparkenv建立了乙個manager,然後通過manager建立了乙個shuffle writer,這個shuffle writer就是將任務的計算結果寫到本地磁碟的角色,所以它很重要,shuffle writer也有3種,這裡不詳細講。此外,還有shuffle reader負責拉取shuffle writer寫入的資料。這個以後會詳細寫。writer呼叫write()方法將資料迭代地寫入磁碟。總結
executor 執行task的過程寫的比較簡單,但其實裡面涉及到比較多的知識點,比如sparkenv,shuffle writer等等,接下來會分別介紹這些知識點。
spark Executor啟動過程分析
本篇文章將以問答的方式對executor的啟動進行分析。首先會尋找可用的 worker 節點來啟動 executor 所謂可用就是前面提到的executor在worker上啟動的條件 worker 節點資源分配是按照如下規則進行的 過濾不可用的work,輪詢可用的work 分配給 executor ...
執行完畢再往下執行?
執行完畢再往下執行?左直拳我寫了一段 如下 busy 進行大工作量,耗時的操作 bigsetup free 其中,函式 busy 的作用是將滑鼠游標設為代表等待的沙漏形狀,並且顯示一幅忙碌的,而函式 free 則正好相反,將游標設回預設的箭頭狀,並顯示清閒的 private void busy pr...
同步執行和非同步執行
同步執行模式 所謂同步執行模式,是指語句在同步執行模式下,將始終保持對程式流的控制,直至 程式結束。如查詢操作,客戶機上的應用程式在向伺服器發出查詢操作的指令後,將 一直等待伺服器將查詢結果返回客戶機端,然後才繼續進行下一步操作。眾所周知,應用程式要從乙個大表中刪除所有的記錄將是非常耗時的,如果應用...