Spark RDD的stage劃分和容錯

2021-10-04 04:57:48 字數 1812 閱讀 6077

rdd( resilient distributed dataset ) 彈性分布式資料集;rdd代表是乙個不可變的、可分割槽的、支援平行計算的元素集合(類似於scala中的不可變集合),rdd可以通過hdfs、scala集合、rdd轉換、外部的資料集(支援inputformat)獲得;並且我們可以通知spark將rdd持久化在記憶體中,可以非常高效的重複利用或者在某些計算節點故障時自動資料恢復;

在對資料來源rdd應用轉換操作時,產生的新的rdd會有一種依賴關係稱為血統(lineage); spark應用在計算時會根據血統(lineage)逆向推導出所有stage(階段),每乙個stage的分割槽數量決定了任務的並行度,乙個stage實現任務的本地計算(大資料計算時網路傳輸時比較耗時的)

def main(args: array[string]

): unit =

}寬窄依賴

rdd之間血緣關係可以詳細分為兩種:寬依賴(wide dependency)和窄依賴(narrow dependency);

rdd的容錯

spark計算時當某些task在計算時未正確處理,則會觸發rdd的容錯機制;容錯機制分為三種情況:

rdd持久化(persist)

// 持久化方法 memory_only

rdd.

persist()

// 快取方法 memory_only

rdd.

cache()

// 設定持久化級別

rdd.

persist

(storagelevel.memory_and_disk)

// 取消rdd持久化方法

注意:

檢查點機制(checkpoint)

// 設定檢查點資料的存放目錄

sc.setcheckpointdir

("hdfs://sparkonstandalone:9000/checkpoint"

)// rdd由兩個分割槽構成

val rdd = sc.

makerdd

(list

("hello kafka"

,"hello scala"

,"hello hadoop"),

2)val grouprdd = rdd

.flatmap

(_.split

("\\s"))

.map

((_,

1l))

.groupbykey(3

)// 標記檢查點

RDD的依賴關係,以及造成的stage的劃分

val data array 1 2,3 4,5 val distdata sc.parallelize data val resultrdd distdata.flatmap v 1 to v map v v 2,1 reducebykey resultrdd.todebugstring 檢視rd...

SparkRDD的分割槽

rdd的分割槽,在運算元裡面未指定rdd的分割槽的時候,預設的分割槽數和核數相同,同理也會啟動相應的task個數 原始碼中的分割槽數預設是2 sc.textfile 其中分割槽數和讀取的小檔案數相同,都小於128m,基於spark2.2.0的,textfile預設是呼叫的是hadoop的textfi...

SparkRDD的廣播變數

廣播變數用來高效分發較大的物件。向所有工作節點傳送乙個較大的唯讀值,以供乙個或多個spark操作使用。比如,如果你的應用需要向所有節點傳送乙個較大的唯讀查詢表,甚至是機器學習演算法中的乙個很大的特徵向量,廣播變數用起來都很順手。在多個並行操作中使用同乙個變數,但是 spark會為每個任務分別傳送。s...