Spark 之 SparkContext 原理剖析

2021-09-23 23:45:58 字數 3195 閱讀 2074

driver 程序被啟動時,會例項化 sparkcontext 物件,然後 sparkcontext 在構建 dagscheduler 和 taskscheduler 物件。這句話在 spark學習筆記之排程 基本上都會被提及,這篇就從原始碼角度來剖析這個問題。

首先從 sparkcontext 原始碼入手:

-- sparkcontext.scala

// 初始化 taskscheduler

val (sched, ts) = sparkcontext.createtaskscheduler(this, master, deploymode)

_schedulerbackend = sched

_taskscheduler = ts

//初始化 dagscheduler

_dagscheduler = new dagscheduler(this)

_heartbeatreceiver.ask[boolean](taskschedulerisset)

來看看 taskscheduler 的初始化操作

-- sparkcontext.scala

/*根據跟定的 url 來建立 task scheduler,這裡返回 schedulerbackend, taskscheduler 兩個物件,也就是說 schedulerbackend 和 taskscheduler 分別被例項化了。

*/ private def createtaskscheduler(

sc: sparkcontext,

master: string,

deploymode: string): (schedulerbackend, taskscheduler) =

}

在例項化 schedulerbackend, taskscheduler 時,會建立乙個 schedulerpool,schedulerpool 會判斷 fair 和 fifo 方式來建立。

-- taskschedulerimpl.scala

def initialize(backend: schedulerbackend)

}schedulablebuilder.buildpools()

}

taskscheduler 是乙個底層排程介面,實際執行操作在org.apache.spark.scheduler.taskschedulerimpl。而其底層通過操作乙個 schedulerbackend,針對不同種類的 cluster 排程方式(standalone,yarn,mesos)來排程 task。

客戶端可以呼叫 initialize 和 start 方法,然後通過 runtasks 方法來提交 task sets。

初始化完 schedulerbackend, taskscheduler 就會啟動 taskscheduler (_taskscheduler.start()),最終會呼叫 standaloneschedulerbackend.start()

-- standaloneschedulerbackend.scala

override def start()

weburl, sc.eventlogdir, sc.eventlogcodec, coresperexecutor, initialexecutorlimit)

client.start()

waitforregistration()

}

def start()

/*** 註冊到 所有的 master 上,返回乙個 array[future].

*/private def tryregisterallmasters(): array[jfuture[_]] =

loginfo("connecting to master " + masteraddress.tosparkurl + "...")

val masterref = rpcenv.setupendpointref(masteraddress, master.endpoint_name)

} catch })}

}再來看看 dagscheduler 的流程。

dagscheduler 是實現了 面向 stage 的排程的 高層次的排程層,它可以為每個 job 計算出乙個 dag,追蹤 rdd和 stage 的輸出是否被持久化,並且尋找到乙個最優排程機制來執行 job,它會將 stage 作為 taskset 提交到底層的 taskscheduler 來傳送到集群上執行這些 task。此外它還決定了執行每個 task 的最佳位置,基於當前的快取狀態,將這些最佳位置提交給 底層的 taskscheduler。兵器,它會處理由於 shuffle 輸出檔案丟失導致的失敗,在這種情況下,舊的 stage 可能會被重新提交。乙個 stage 內部的失敗,如果不是由於 shuffle檔案丟失導致的,會被 taskscheduler 處理,它會被多次重試每乙個 task,直到最後乙個。實在不行,才會被取消整個 stage。

可以發現,dagscheduler 底層是基於呼叫 dagschedulereventprocessloop

private[spark] val eventprocessloop = new dagschedulereventprocessloop(this)

sparkui 是通過例項化 sparkui 來實現的

-- sparkcontext.scala

_ui =

if (conf.getboolean("spark.ui.enabled", true)) else

// 在開始執行 task 任務時,繫結通訊端埠

_ui.foreach(_.bind())

-- sparkui.scala

/*** 根據儲存的應用狀態來建立 sparkui

*/def create(

sc: option[sparkcontext],

conf: sparkconf,

securitymanager: securitymanager,

basepath: string,

starttime: long,

}

如圖:

Spark 原始碼分析之SparkContext

saprkcontext非常重要,是spark提交任務到集群的入口 sparkcontext中沒有main方法,在sparkcontext主構造器中,主要做一下四件事情 1.呼叫createsparkenv 建立sparkenv,sparkenv中能夠得到actorsystem物件,用於建立acto...

Spark學習(一)之Spark初識

1.spark歷史及簡介 spark是乙個實現快速通用的集群計算平台。它是由加州大學伯克利分校amp實驗室 開發的通用記憶體平行計算框架,用來構建大型的 低延遲的資料分析應用程式。它擴充套件了廣泛使用的mapreduce計算模型。12年正式開源,距今6年歷史。spark執行架構的設計 cluster...

初始spark 四 之spark儲存管理

我們在使用spark進行資料相關的操作的時候,經常會用到的是rdd,但是我們也都知道rdd是乙個抽象的資料集,並不是真正的資料儲存的地方,rdd使我們對資料的操作更方便,其實rdd的出現避免了我們對資料儲存底部的接觸,可以更方便的編寫我們的應用。其實資料的儲存都是由spark的儲存管理模組實現和管理...