rdd( resilient distributed dataset ) 彈性分布式資料集;rdd代表是乙個不可變的、可分割槽的、支援平行計算的元素集合(類似於scala中的不可變集合),rdd可以通過hdfs、scala集合、rdd轉換、外部的資料集(支援inputformat)獲得;並且我們可以通知spark將rdd持久化在記憶體中,可以非常高效的重複利用或者在某些計算節點故障時自動資料恢復;
在對資料來源rdd應用轉換操作時,產生的新的rdd會有一種依賴關係稱為血統(lineage); spark應用在計算時會根據血統(lineage)逆向推導出所有stage(階段),每乙個stage的分割槽數量決定了任務的並行度,乙個stage實現任務的本地計算(大資料計算時網路傳輸時比較耗時的)
def main(args: array[string]
): unit =
}寬窄依賴
rdd之間血緣關係可以詳細分為兩種:寬依賴(wide dependency)和窄依賴(narrow dependency);
rdd的容錯
spark計算時當某些task在計算時未正確處理,則會觸發rdd的容錯機制;容錯機制分為三種情況:
rdd持久化(persist)
// 持久化方法 memory_only
rdd.
persist()
// 快取方法 memory_only
rdd.
cache()
// 設定持久化級別
rdd.
persist
(storagelevel.memory_and_disk)
// 取消rdd持久化方法
注意:
檢查點機制(checkpoint)
// 設定檢查點資料的存放目錄
sc.setcheckpointdir
("hdfs://sparkonstandalone:9000/checkpoint"
)// rdd由兩個分割槽構成
val rdd = sc.
makerdd
(list
("hello kafka"
,"hello scala"
,"hello hadoop"),
2)val grouprdd = rdd
.flatmap
(_.split
("\\s"))
.map
((_,
1l))
.groupbykey(3
)// 標記檢查點
RDD的依賴關係,以及造成的stage的劃分
val data array 1 2,3 4,5 val distdata sc.parallelize data val resultrdd distdata.flatmap v 1 to v map v v 2,1 reducebykey resultrdd.todebugstring 檢視rd...
SparkRDD的分割槽
rdd的分割槽,在運算元裡面未指定rdd的分割槽的時候,預設的分割槽數和核數相同,同理也會啟動相應的task個數 原始碼中的分割槽數預設是2 sc.textfile 其中分割槽數和讀取的小檔案數相同,都小於128m,基於spark2.2.0的,textfile預設是呼叫的是hadoop的textfi...
SparkRDD的廣播變數
廣播變數用來高效分發較大的物件。向所有工作節點傳送乙個較大的唯讀值,以供乙個或多個spark操作使用。比如,如果你的應用需要向所有節點傳送乙個較大的唯讀查詢表,甚至是機器學習演算法中的乙個很大的特徵向量,廣播變數用起來都很順手。在多個並行操作中使用同乙個變數,但是 spark會為每個任務分別傳送。s...