以wordcount流程解析
val lines = sc.textfile()
def
textfile
( path: string,
minpartitions: int = defaultminpartitions): rdd[string] = withscope
val words = lines.flatmap(line => line.split(」 「)) val pairs =
words.map(word => (word, 1))
// 其實rdd裡是沒有reducebykey的,因此對rdd呼叫reducebykey()方法的時候,會觸發scala的隱式轉換;此時就會在作用域內,尋找隱式轉換,會在rdd中找到rddtopairrddfunctions()隱式轉換,然後將rdd轉換為pairrddfunctions。
// 接著會呼叫pairrddfunctions中的reducebykey()方法
val counts = pairs.reducebykey(_ + _)
counts.foreach(count => println(count._1 + 「: 」 + count._2))
def
runjob[t, u: classtag](
rdd: rdd[t],
func: (taskcontext, iterator[t]) => u,
partitions: seq[int],
resulthandler: (int, u) => unit): unit =
val callsite = getcallsite
val cleanedfunc = clean(func)
loginfo("starting job: " + callsite.shortform)
if (conf.getboolean("spark.loglineage", false))
//呼叫sparkcontext之前初始化建立的dagscheduler的runjob的方法。
dagscheduler.runjob(rdd, cleanedfunc, partitions, callsite, resulthandler, localproperties.get)
progressbar.foreach(_.finishall())
rdd.docheckpoint()
}
job 觸發原始碼分析
基礎 wordcount.scala 小應用 1 val lines sc.textfile 2 val words lines.flatmap line line.split 3 val pairs words.map word word,1 4 val counts pairs.reduceby...
Job提交流程原始碼
1.開始提交程式 boolean result job.waitforcompletion true 2.當job執行狀態為為define,提交job if state jobstate.define 3.確保job狀態 ensurestate jobstate.define 4.相容新舊api s...
Job提交流程原始碼解析
1.job.waitforcompletion true 在driver中提交job 1 sumbit 提交 1 connect 1 return new cluster getconfiguration initialize jobtrackaddr,conf 通過yarnclientprotoc...