繼續前一篇的內容。前一篇內容為:
spark中client原始碼分析(一)client中唯一的main方法如下:
def main(args: array[string])
val conf = new sparkconf()
val driverargs = new clientarguments(args)
if (!driverargs.loglevel.isgreaterorequal(level.warn))
conf.set("spark.rpc.asktimeout", "10")
conf.set("akka.loglevel", driverargs.loglevel.tostring.replace("warn", "warning"))
logger.getrootlogger.setlevel(driverargs.loglevel)
//建立乙個driverclient的rpc環境,並將得到master和client的遠端引用
val rpcenv =
rpcenv.create("driverclient", utils.localhostname(), 0, conf, new securitymanager(conf))
val masterendpoints = driverargs.masters.map(rpcaddress.fromsparkurl).
map(rpcenv.setupendpointref(master.system_name, _, master.endpoint_name))
//clientpoint
rpcenv.setupendpoint("client", new clientendpoint(rpcenv, driverargs, masterendpoints, conf))
//啟動rpc環境
rpcenv.awaittermination()
}
case "kill" =>
val driverid = driverargs.driverid
ayncsendtomasterandforwardreplykilldriverresponse}}
(3)onstop方法簡單,略過。
(4)receive方法如下,
override def receive: partialfunction[any, unit] = else if (!utils.responsefrombackup(message))
case killdriverresponse(master, driverid, success, message) =>
loginfo(message)
if (success) else if (!utils.responsefrombackup(message))
}
①pollandreportstatus方法如下,用於找到driver的資訊然後退出jvm
def pollandreportstatus(driverid: string) ")
//返回的其實是worker的資訊
(statusresponse.workerid, statusresponse.workerhostport, statusresponse.state) match
statusresponse.exception.map
system.exit(0)}}
原 Spark中Client原始碼分析(一)
下面我們重點看clientendpoint,它是執行緒安全的。registerwithmaster方法如下,用於非同步註冊到所有的master上,如果沒有超過再次註冊的次數 3次 那麼每20s將會重新呼叫該方法申請註冊,如果註冊成功,所有的呼叫work和futures將會被取消。private de...
Mybatis中Logging模組的原始碼分析
週末又來到了公司吹吹空調,順便記錄下mybatis的點點滴滴。首先mybatis不定義日誌系統,完全依賴於第三方系統完成日誌記錄,利用介面卡模式完成實際操作,原始碼如下 1 定義乙個log介面,具有如下方法 public inte ce log2 然後引入第三方日誌,比如 loggingimpl,實...
原 Spark中Master原始碼分析(二)
spark中master原始碼分析 一 2 恢復完畢,重新建立driver,完成資源的重新分配 case completerecovery completerecovery 詳見下 relaunchdriver方法如下,將driver的轉態為relaunching,新增到即將建立的driver列表中...