Spark 核心 RDD 剖析(上)

2021-07-22 22:52:20 字數 4368 閱讀 5452

本文將通過描述 spark rdd 的五大核心要素來描述 rdd,若希望更全面了解 rdd 的知識,請移步 rdd **:rdd:基於記憶體的集群計算容錯抽象

spark 的五大核心要素包括:

下面一一來介紹

rdd 由若干個 partition 組成,共有三種生成方式:

那麼,在使用上述方法生成 rdd 的時候,會為 rdd 生成多少個 partition 呢?一般來說,載入 scala 集合或外部資料來建立 rdd 時,是可以指定 partition 個數的,若指定了具體值,那麼 partition 的個數就等於該值,比如:

val

rdd1 = sc.makerdd( scalaseqdata, 3 ) //< 指定 partition 數為3

valrdd2 = sc.textfile( hdfsfilepath, 10 ) //< 指定 partition 數為10

若沒有指定具體的 partition 數時的 partition 數為多少呢?

對於從外部資料載入而來的 rdd:預設的 partition 數為min(defaultparallelism, 2)對於執行轉換操作而得到的 rdd:視具體操作而定,如 map 得到的 rdd 的 partition 數與 父 rdd 相同;union 得到的 rdd 的 partition 數為父 rdds 的 partition 數之和...

我們常說,partition 是 rdd 的資料單位,代表了乙個分割槽的資料。但這裡千萬不要搞錯了,partition 是邏輯概念,是代表了乙個分片的資料,而不是包含或持有乙個分片的資料。

真正直接持有資料的是各個 partition 對應的迭代器,要再次注意的是,partition 對應的迭代器訪問資料時也不是把整個分割槽的資料一股腦載入持有,而是像常見的迭代器一樣一條條處理。舉個例子,我們把 hdfs 上10g 的檔案載入到 rdd 做處理時,並不會消耗10g 的空間,如果沒有 shuffle 操作(shuffle 操作會持有較多資料在記憶體),那麼這個操作的記憶體消耗是非常小的,因為在每個 task 中都是一條條處理處理的,在某一時刻只會持有一條資料。這也是初學者常有的理解誤區,一定要注意 spark 是基於記憶體的計算,但不會傻到什麼時候都把所有資料全放到記憶體。

讓我們來看看 partition 的定義幫助理解:

trait partition extends serializable
在 trait partition 中僅包含返回其索引的 index 方法。很多具體的 rdd 也會有自己實現的 partition,比如:

kafkarddpartition 提供了獲取 partition 所包含的 kafka msg 條數的方法

class

kafkarddpartition(

valindex:

int,

valtopic:

string,

valpartition:

int,

valfromoffset:

long,

valuntiloffset:

long,

valhost:

string,

valport:

int) extends

partition

unionrdd 的 partition 類 unionpartition 提供了獲取依賴的父 partition 及獲取優先位置的方法

private[spark] class

unionpartition[

t: classtag](

idx: int,

@transient

private

valrdd:

rdd[t],

valparentrddindex:

int,

@transient

private

valparentrddpartitionindex:

int)

extends

partition

rdd 的def iterator(split: partition, context: taskcontext): iterator[t]方法用來獲取 split 指定的 partition 對應的資料的迭代器,有了這個迭代器就能一條一條取出資料來按 compute chain 來執行乙個個transform 操作。iterator 的實現如下:

final def

iterator

(split: partition, context: taskcontext): iterator[t] = else

}

def 前加了 final 說明該函式是不能被子類重寫的,其先判斷 rdd 的 storagelevel 是否為 none,若不是,則嘗試從快取中讀取,讀取不到則通過計算來獲取該 partition 對應的資料的迭代器;若是,嘗試從 checkpoint 中獲取 partition 對應資料的迭代器,若 checkpoint 不存在則通過計算來獲取。

剛剛介紹了如果從 cache 或者 checkpoint 無法獲得 partition 對應的資料的迭代器,則需要通過計算來獲取,這將會呼叫到def compute(split: partition, context: taskcontext): iterator[t]方法,各個 rdd 最大的不同也體現在該方法中。後文會詳細介紹該方法

partitioner 即分割槽器,說白了就是決定 rdd 的每一條訊息應該分到哪個分割槽。但只有 k, v 型別的 rdd 才能有 partitioner(當然,非 key, value 型別的 rdd 的 partitioner 為 none),非 key, value 型別的 rdd 的 partition 為 none。

partitioner 為 none 的 rdd 的 partition 的資料要麼對應資料來源的某一段資料,要麼來自對父 rdds 的 partitions 的處理結果。

我們先來看看 partitioner 的定義及注釋說明:

abstract

class

partitioner

extends

serializable

partitioner 共有兩種實現,分別是 hashpartitioner 和 rangepartitioner

先來看 hashpartitioner 的實現(省去部分**):

class

hashpartitioner(partitions: int) extends

partitioner

...}// x 對 mod 求于,若結果為正,則返回該結果;若結果為負,返回結果加上 mod

defnonnegativemod(x: int, mod: int): int =

numpartitions直接返回主建構函式中傳入的 partitions 引數,之前在有本書裡看到說 partitioner 不僅決定了一條 record 應該屬於哪個 partition,還決定了 partition 的數量,其實這句話的後半段的有誤的,partitioner 並不能決定乙個 rdd 的 partition 數,partitioner 方法返回的 partition 數是直接返回外部傳入的值。

getpartition方法也不複雜,主要做了:

為引數 key 計算乙個 hash 值

若該雜湊值對 partition 個數取與結果為正,則該結果即該 key 歸屬的 partition index;否則,以該結果加上 partition 個數為 partition index

從上面的分析來看,當 key, value 型別的 rdd 的 key 的 hash 值分布不均勻時,會導致各個 partition 的資料量不均勻,極端情況下乙個 partition 會持有整個 rdd 的資料而其他 partition 則不包含任何資料,這顯然不是我們希望看到的,這時就需要 rangepartitioner 出馬了。

上文也提到了,hashpartitioner 可能會導致各個 partition 資料量相差很大的情況。這時,初衷為使各個 partition 資料分布盡量均勻的 rangepartitioner 便有了用武之地。

rangepartitioner 將乙個範圍內的資料對映到 partition,這樣兩個 partition 之間要麼是乙個 partition 的資料都比另外乙個大,或者小。rangepartitioner採用水塘抽樣演算法,比 hashpartitioner 耗時,具體可見:spark分割槽器hashpartitioner和rangepartitioner**詳解

spark底層核心 RDD詳解

spark底層核心rdd 是什麼?彈性分布式資料集 簡單點就理解成乙個list集合 rdd 1,2,3 有什麼屬性?用idea中注釋的話來解釋有5大屬性 1 乙個分割槽列表,資料集的基本組成單位 rdd以分割槽為單位,乙個分割槽乙個task任務來處理執行,可以在建立rdd時指定rdd的分割槽個數,如...

Spark核心架構深度剖析

driver 就是我們用來提交編寫的spark程式的一台機器,在driver中最重要的一件事 建立sparkcontext sparkcontext 我們在建立sparkcontext的過程中,最重要的3件事,其一建立dagsechedule 有向無迴圈圖排程者 其二建立taskscheduler ...

Spark核心架構深度剖析

1,通過spark submit提交編寫好的spark程式,這時候spark會通過反射的方式,建立和構造乙個driveractor程序出來。3,應用程式每執行到乙個action就會建立乙個job,job會提交給dagscheduler,dagscheduler會通過stage劃分演算法 5,mast...