Spark RDD 原始碼分析

2021-07-04 17:13:19 字數 4486 閱讀 2062

概述:rdd是分布式資料集,代表了不可變、分割槽的元素集合,這些元素可以並行操作。

rdd有五個主要屬性:

* -partition列表,和hadoop類似, 可切分的資料才能平行計算

* -計算每個split的function,rdd裡面的compute函式

* -對於其他rdd的依賴列表,分寬、窄(依賴)兩種,不是所有的rdd都有

* -(可選)對於key-value型別的rdd來講,乙個分割槽器,(比如:來說明這rdd是hash分割槽的),類似於hadoop的partitioner介面,控制key分配到那個reduce。

* -(可選)計算每個split的preferred location(比如:乙個hdfs檔案的塊location)。

對應於原始碼,有4個方法和乙個屬性:

protected def getpartitions: array[partition]

protected def getdependencies: seq[dependency[_]] = deps

protected def getpreferredlocations(split: partition): seq[string] = nil

def compute(split: partition, context: taskcontext): iterator[t]

@transient val partitioner: option[partitioner] = none

乙個簡單程式,wordcount

val data=sc.textfile(「data.txt」)

val flatrdd=data.flatmap(s=>s.split(「\\s+」))

val filterrdd=flatrdd.filter(_.length>=2)

val maprdd=filterrdd.map(work=>(word,1))

val reduce=maprdd.reducebykey(_+_)

對應原始碼:

hadoopfile方法,裡面我們看到它做了3個操作。

a、 把hadoop的配置檔案儲存到廣播變數裡。

b、設定路徑的方法

c、new了乙個hadooprdd返回

hadooprdd這個類的getpartitions、compute、getpreferredlocations。

先看getpartitions,它的核心**如下:

val inputsplits = inputformat.getsplits(jobconf, minpartitions)

val array = new array[partition](inputsplits.size)

for (i <- 0 until inputsplits.size)

它呼叫的是inputformat自帶的getsplits方法來計算分片,然後把分片hadooppartition包裝到到array裡面返回。

看compute方法,它的輸入值是乙個partition,返回是乙個iterator[(k, v)]型別的資料,這裡面我們只需要關注2點即可。

1、把partition轉成hadooppartition,然後通過inputsplit建立乙個recordreader

2、 重寫iterator的genext方法,通過建立的reader呼叫next方法讀取下乙個值。

//把partition轉化成hadooppartition

val split = thesplit.asinstanceof[hadooppartition]

loginfo("input split: " + split.inputsplit)

var reader: recordreader[k, v] = null

val jobconf = getjobconf()

val inputformat = getinputformat(jobconf)

hadooprdd.addlocalconfiguration(new ******dateformat("yyyymmddhhmm").format(createtime),

context.stageid, thesplit.index, context.attemptid.toint, jobconf)

//通過inputformat建立乙個recordreader

reader = inputformat.getrecordreader(split.inputsplit.value, jobconf, reporter.null)

override def getnext() = catch

(key, value)

}

從這裡我們可以看得出來compute方法是通過分片來獲得iterator介面,以遍歷分片的資料。

getpreferredlocations方法就更簡單了,直接呼叫inputsplit的getlocations方法獲得所在的位置。

有關依賴:

sc.textfile方法呼叫hadoopfile方法後(返回hadooprdd),緊接著呼叫map方法,hadooprdd中沒有此方法,呼叫父類(rdd)方法,

def this(@transient oneparent: rdd[_]) =

this(oneparent.context , list(new onetoonedependency(oneparent)))

會繼續呼叫rdd的主構造器。此處的oneparent就是傳入的hadooprdd例項。再看rdd的主構造器:

abstract class rdd[t: classtag](

@transient private var sc: sparkcontext,

@transient private var deps: seq[dependency[_]]

) extends serializable with logging

//dependencies是乙個seq,head方法獲取的是其第乙個元素,dependencies.head返回的是乙個dependency,dependency有乙個rdd方法,返回和dependency的rdd。

protected[spark] def firstparent[u: classtag] =

到這兒基本明確了firstparent是什麼,繼續深入就比較複雜,涉及到checkpoint的東西,比較不好理解。

dependencies是乙個dependency的序列,

private var dependencies_ : seq[dependency[_]] = null

//以seq的形式,返回乙個rdd的dependendies。

final def dependencies: seq[dependency[_]] =

dependencies_

} }

在獲取dependencies的過程中,用到了checkpointrdd,

private def checkpointrdd: option[rdd[t]] = checkpointdata.flatmap(_.checkpointrdd)

此時用到了checkpointdata,是option型別,其實例是scala.some或者none的例項,some[a]代表了乙個a型別的值。

private[spark] var checkpointdata: option[rddcheckpointdata[t]] = none

這個類包含了所有和rdd checkpointing相關的資訊。這個類的每乙個例項都和乙個rdd相關。

現在回到dependencies方法,如果存在checkpointrdd的話,則從checkpointrdd獲取依賴資訊,如果不存在,則呼叫getdependencies方法回去依賴。

protected def getdependencies: seq[dependency[_]] = deps

implicit def rddtopairrddfunctions[k, v](rdd: rdd[(k, v)])

(implicit kt: classtag[k], vt: classtag[v], ord: ordering[k] = null) =

在sparkcontext.scala中定義了很多隱式轉化,當找不到相應方法是,可以去查下隱式轉換。

reducebykey的**:

def reducebykey(partitioner: partitioner, func: (v, v) => v): rdd[(k, v)] =

可以看到呼叫了combinebykey方法,下面是其主要**:

如果沒有,則會建立新的shuffledrdd.

Spark RDD原始碼閱讀01

rdd是什麼 resilient distributed dataset 一 rdd的特徵屬性 二 rdd的執行job的流程 rdd 這些方法是判斷這個job結束的標誌,然後開始執行job。11 1號表示獲取當前shuffleddep.rdd的依賴的shufflerdd,2號表示對所依賴的shuff...

spring原始碼分析 spring原始碼分析

1.spring 執行原理 spring 啟動時讀取應用程式提供的 bean 配置資訊,並在 spring 容器中生成乙份相應的 bean 配置登錄檔,然後根據這張登錄檔例項化 bean,裝配好 bean 之間的依賴關係,為上 層應用提供準備就緒的執行環境。二 spring 原始碼分析 1.1spr...

思科VPP原始碼分析(dpo機制原始碼分析)

vpp的dpo機制跟路由緊密結合在一起。路由表查詢 ip4 lookup 的最後結果是乙個load balance t結構。該結構可以看做是乙個hash表,裡面包含了很多dpo,指向為下一步處理動作。每個dpo都是新增路由時的乙個path的結果。dpo標準型別有 dpo drop,dpo ip nu...