spark原始碼剖析 RDD相關原始碼閱讀筆記

2021-07-05 06:24:39 字數 3885 閱讀 8669

最好的原始碼閱讀方法就是除錯,沒有之一

之前其實有閱讀過rdd相關的原始碼,最近學習過程中發現在之前原本閱讀過的模組中有一些『關節』並沒有打通,所以想通過除錯的方式來更細緻得學習原始碼。

本文為編寫測試用例並除錯rdd相關模組的筆記,並沒有列出具體的除錯過程,僅列出結論以做備忘,特別是那些比較容易忽略或者說是其他blog或者書本中比較少提到的。

//< 儲存與其直接父rdds的依賴

private var dependencies_ : seq[dependency[_]] = null

protected def

getdependencies: seq[dependency[_]] = deps

final def

dependencies: seq[dependency[_]] =

dependencies_}}

當除錯**時,若碰到明明由其他rdd執行transform操作而生成的rdd的dependencies_成員為空,比如下面這種情況:

莫要驚慌,並不是hashrdd1沒有依賴的父rdd,而是沒有將該依賴賦值給dependencies_而已,在產生hashrdd1後,只需呼叫hashrdd1.dependencies就可以在除錯的監控視窗看到hashrdd1.dependencies_該有的值了。

這並沒有什麼不妥,在之後dagscheduler劃分stage過程中,會呼叫所有rdd的dependencies方法。

protected def

getpartitions: array[partition]

//< partitions_為陣列型別

@transient private var partitions_ : array[partition] = null

final def

partitions: array[partition] =

partitions_

}}//< 可選的

@transient val partitioner: option[partitioner] = none

分割槽相關的需要知道以下幾點:

1. 與『依賴』一樣,若明明由其他rdd執行transform操作而生成的rdd的partitions_成員為空並不是該rdd沒有分割槽,只是還沒將分割槽賦值給partitions_,之後的操作會通過呼叫成員函式partitions進行賦值並返回

2. partitioner為可選值,對於非key-value的rdd該值為none。key-value型別的rdd也需要在建立時指定partitioner。目前公有hashpartitionerrangepartitioner兩種partitioner

rdd支援快取,可支援快取到磁碟、記憶體、offheap及是否序列化快取。rdd包含快取等級成員:

private var storagelevel: storagelevel = storagelevel.none,預設為storagelevel.none,storagelevel定義及storagelevel.none定義如下:

//< storagelevel定義

class storagelevel private(

private var _usedisk: boolean,

private var _usememory: boolean,

private var _useoffheap: boolean,

private var _deserialized: boolean,

private var _replication: int = 1)

object storagelevel

從上面**可以看出預設值storagelevel.none表示不以任何方式快取rdd

對於這個成員,我不知道怎麼用中文描述好,該成員在rdd主建構函式中定義如下

@transient private[spark] val creationsite = sc.getcallsite(),表示程式設計師寫的具體哪個類中的哪行**生成這個rdd。

有兩種格式,一種是shortform,即短格式,如parallelize at divedestage.scala:14;還有一種是longform,即長格式,如

org.apache

.spark

.sparkcontext

.parallelize(sparkcontext.scala:563)

dividestages$.main(divedestage.scala:14)

dividestages.main(divedestage.scala)

rdd還提供了getcreationsite方法以獲取該creationsite,該方法會在rdd.tostring方法中被呼叫以告知該rdd是程式設計師寫的哪行**生成的

在rdd呼叫checkpoint方法時,如下:

def checkpoint()  else

if (checkpointdata.isempty)

}

會建立乙個rddcheckpointdata例項,該類儲存與checkpint相關的所有資訊,包括:關聯的rdd,check point狀態(公有initialized, markedforcheckpoint, checkpointinginprogress, checkpointed集中狀態),check point檔案及check point得到的rdd。

在呼叫checkpointdata = some(new rddcheckpointdata(this))時,check point狀態被置為initialized。之後呼叫checkpointdata.get.markforcheckpoint()將check point狀態置為markedforcheckpoint。這裡只是標記該rdd需要check point,並不會真的執行check point。

check point是個比較複雜的流程,我將會專門寫一篇文章介紹,這裡就不展開了。

rdd的成員和方法有好幾十個,無法一一枚舉。最後再列幾個方法:

//< 由子類實現來計算乙個給定的分割槽

@developerapi

defcompute

(split: partition, context: taskcontext): iterator[t]

//< 獲得某個partition的優先位置,這對排程task非常重要

protected def

getpreferredlocations

(split: partition): seq[string] = nil

//< 快取rdd,可以快取記憶體、磁碟、offheap等;cache方法其實也是對該方法進行了封裝

defpersist

(newlevel: storagelevel): this.type =

rdd具有name屬性,可通過rdd.setname設定,name預設為nul

Spark 核心 RDD 剖析(上)

本文將通過描述 spark rdd 的五大核心要素來描述 rdd,若希望更全面了解 rdd 的知識,請移步 rdd rdd 基於記憶體的集群計算容錯抽象 spark 的五大核心要素包括 下面一一來介紹 rdd 由若干個 partition 組成,共有三種生成方式 那麼,在使用上述方法生成 rdd 的...

原始碼剖析 Hashtable 原始碼剖析

hashtable同樣是基於雜湊表實現的,同樣每個元素都是key value對,其內部也是通過單鏈表解決衝突問題,容量不足 超過了閾值 時,同樣會自動增長。hashtable也是jdk1.0引入的類,是執行緒安全的,能用於多執行緒環境中。hashtable同樣實現了serializable介面,它支...

RDD原始碼分析 Iterator

rdd,resilient distributed datasets,彈性分布式資料集。在spark中,通俗地可以認為是乙個資料集合,只不過這個資料集合分布在不同的機器上,對外表現為乙個整體。一般來講,對rdd進行操作比如map操作時分為兩步,第一步為區域性操作,即是對每台機器上的rdd的部分資料都...