概述:rdd是分布式資料集,代表了不可變、分割槽的元素集合,這些元素可以並行操作。
rdd有五個主要屬性:
* -partition列表,和hadoop類似, 可切分的資料才能平行計算
* -計算每個split的function,rdd裡面的compute函式
* -對於其他rdd的依賴列表,分寬、窄(依賴)兩種,不是所有的rdd都有
* -(可選)對於key-value型別的rdd來講,乙個分割槽器,(比如:來說明這rdd是hash分割槽的),類似於hadoop的partitioner介面,控制key分配到那個reduce。
* -(可選)計算每個split的preferred location(比如:乙個hdfs檔案的塊location)。
對應於原始碼,有4個方法和乙個屬性:
protected def getpartitions: array[partition]
protected def getdependencies: seq[dependency[_]] = deps
protected def getpreferredlocations(split: partition): seq[string] = nil
def compute(split: partition, context: taskcontext): iterator[t]
@transient val partitioner: option[partitioner] = none
乙個簡單程式,wordcount
val data=sc.textfile(「data.txt」)
val flatrdd=data.flatmap(s=>s.split(「\\s+」))
val filterrdd=flatrdd.filter(_.length>=2)
val maprdd=filterrdd.map(work=>(word,1))
val reduce=maprdd.reducebykey(_+_)
對應原始碼:
hadoopfile方法,裡面我們看到它做了3個操作。
a、 把hadoop的配置檔案儲存到廣播變數裡。
b、設定路徑的方法
c、new了乙個hadooprdd返回
hadooprdd這個類的getpartitions、compute、getpreferredlocations。
先看getpartitions,它的核心**如下:
val inputsplits = inputformat.getsplits(jobconf, minpartitions)
val array = new array[partition](inputsplits.size)
for (i <- 0 until inputsplits.size)
它呼叫的是inputformat自帶的getsplits方法來計算分片,然後把分片hadooppartition包裝到到array裡面返回。
看compute方法,它的輸入值是乙個partition,返回是乙個iterator[(k, v)]型別的資料,這裡面我們只需要關注2點即可。
1、把partition轉成hadooppartition,然後通過inputsplit建立乙個recordreader
2、 重寫iterator的genext方法,通過建立的reader呼叫next方法讀取下乙個值。
//把partition轉化成hadooppartition
val split = thesplit.asinstanceof[hadooppartition]
loginfo("input split: " + split.inputsplit)
var reader: recordreader[k, v] = null
val jobconf = getjobconf()
val inputformat = getinputformat(jobconf)
hadooprdd.addlocalconfiguration(new ******dateformat("yyyymmddhhmm").format(createtime),
context.stageid, thesplit.index, context.attemptid.toint, jobconf)
//通過inputformat建立乙個recordreader
reader = inputformat.getrecordreader(split.inputsplit.value, jobconf, reporter.null)
override def getnext() = catch
(key, value)
}
從這裡我們可以看得出來compute方法是通過分片來獲得iterator介面,以遍歷分片的資料。
getpreferredlocations方法就更簡單了,直接呼叫inputsplit的getlocations方法獲得所在的位置。
有關依賴:
sc.textfile方法呼叫hadoopfile方法後(返回hadooprdd),緊接著呼叫map方法,hadooprdd中沒有此方法,呼叫父類(rdd)方法,
def this(@transient oneparent: rdd[_]) =
this(oneparent.context , list(new onetoonedependency(oneparent)))
會繼續呼叫rdd的主構造器。此處的oneparent就是傳入的hadooprdd例項。再看rdd的主構造器:
abstract class rdd[t: classtag](
@transient private var sc: sparkcontext,
@transient private var deps: seq[dependency[_]]
) extends serializable with logging
//dependencies是乙個seq,head方法獲取的是其第乙個元素,dependencies.head返回的是乙個dependency,dependency有乙個rdd方法,返回和dependency的rdd。
protected[spark] def firstparent[u: classtag] =
到這兒基本明確了firstparent是什麼,繼續深入就比較複雜,涉及到checkpoint的東西,比較不好理解。
dependencies是乙個dependency的序列,
private var dependencies_ : seq[dependency[_]] = null
//以seq的形式,返回乙個rdd的dependendies。
final def dependencies: seq[dependency[_]] =
dependencies_
} }
在獲取dependencies的過程中,用到了checkpointrdd,
private def checkpointrdd: option[rdd[t]] = checkpointdata.flatmap(_.checkpointrdd)
此時用到了checkpointdata,是option型別,其實例是scala.some或者none的例項,some[a]代表了乙個a型別的值。
private[spark] var checkpointdata: option[rddcheckpointdata[t]] = none
這個類包含了所有和rdd checkpointing相關的資訊。這個類的每乙個例項都和乙個rdd相關。
現在回到dependencies方法,如果存在checkpointrdd的話,則從checkpointrdd獲取依賴資訊,如果不存在,則呼叫getdependencies方法回去依賴。
protected def getdependencies: seq[dependency[_]] = deps
implicit def rddtopairrddfunctions[k, v](rdd: rdd[(k, v)])
(implicit kt: classtag[k], vt: classtag[v], ord: ordering[k] = null) =
在sparkcontext.scala中定義了很多隱式轉化,當找不到相應方法是,可以去查下隱式轉換。
reducebykey的**:
def reducebykey(partitioner: partitioner, func: (v, v) => v): rdd[(k, v)] =
可以看到呼叫了combinebykey方法,下面是其主要**:
如果沒有,則會建立新的shuffledrdd.
Spark RDD原始碼閱讀01
rdd是什麼 resilient distributed dataset 一 rdd的特徵屬性 二 rdd的執行job的流程 rdd 這些方法是判斷這個job結束的標誌,然後開始執行job。11 1號表示獲取當前shuffleddep.rdd的依賴的shufflerdd,2號表示對所依賴的shuff...
spring原始碼分析 spring原始碼分析
1.spring 執行原理 spring 啟動時讀取應用程式提供的 bean 配置資訊,並在 spring 容器中生成乙份相應的 bean 配置登錄檔,然後根據這張登錄檔例項化 bean,裝配好 bean 之間的依賴關係,為上 層應用提供準備就緒的執行環境。二 spring 原始碼分析 1.1spr...
思科VPP原始碼分析(dpo機制原始碼分析)
vpp的dpo機制跟路由緊密結合在一起。路由表查詢 ip4 lookup 的最後結果是乙個load balance t結構。該結構可以看做是乙個hash表,裡面包含了很多dpo,指向為下一步處理動作。每個dpo都是新增路由時的乙個path的結果。dpo標準型別有 dpo drop,dpo ip nu...