什麼是rdd
rdd(resilient distributed dataset)叫做分布式資料集,是spark中最基本的資料抽象,它代表乙個不可變、可分割槽、裡面的元素可平行計算的集合。rdd具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。rdd允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提公升了查詢速度。
rdd的屬性
一組分片(partition),即資料集的基本組成單位。對於rdd來說,每個分片都會被乙個計算任務處理,並決定平行計算的粒度。使用者可以在建立rdd時指定rdd的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的cpu core的數目。
乙個計算每個分割槽的函式。spark中rdd的計算是以分片為單位的,每個rdd都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。
rdd之間的依賴關係。rdd的每次轉換都會生成乙個新的rdd,所以rdd之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對rdd的所有分割槽進行重新計算。
乙個partitioner,即rdd的分片函式。當前spark中實現了兩種型別的分片函式,乙個是基於雜湊的hashpartitioner,另外乙個是基於範圍的rangepartitioner。只有對於於key-value的rdd,才會有partitioner,非key-value的rdd的parititioner的值是none。partitioner函式不但決定了rdd本身的分片數量,也決定了parent rdd shuffle輸出時的分片數量。
乙個列表,儲存訪問每個partition的優先位置(preferred location)。對於乙個hdfs檔案來說,這個列表儲存的就是每個partition所在的塊的位置。按照「移動資料不如移動計算」的理念,spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置。
基本rdd操作
建立rdd:
1)讀取外部資料集
val file=sc.textfile(「hdfs://hadoop1:9000/input/word/word.txt」)
2)在驅動器程式中對乙個集合進行並行化
val lines = sc.parallelize(list("pandas","i like pandas"))
rdd操作:
rdd轉化操作是返回乙個新的rdd的操作,比如map()和filter()
rdd行動操作則是向驅動器程式返回結果或把結果寫入外部系統的操作,會觸發實際的計算
1)轉化操作
val inputrdd = sc.textfile(「hdfs://hadoop1:9000/input/word/word.txt」)
val keyrdd = inputrdd.filter(line => line.contains("guofei"))
2)行動操作0
val keyrdd = inputrdd.filter(line => line.contains("guofei"))
wantrdd.take(10).foreach(println)
常見的轉化操作和行動操作
1.轉化操作
map()與flatmap()區別
flatmap 將函式應用於rdd中的每個元素,將返回的迭代器的所有的內容構成新的rdd,通常用來切分單詞
val lines = sc.parallelize(list(「come on」,」guofei」))
var words = lines.flatmap(line => line.split(」 「))
words.collect()
map 將函式應用於rdd中的每個元素,將返回值構成新的rdd
var words1 = lines.map(line => line.split(" "))
words1.collect()
filter 返回乙個由通過傳給filter()的函式的元素組成的rdd
val list = sc.parallelize(list(1,2,3,3))
val listfilter = list.filter(x => x != 1)
listfilter.collect()
distinct 去重
val listdistinct = list.distinct()
listdistinct.collect()
union() 生成乙個包含倆哥哥rdd中所有元素的rdd
val list = sc.parallelize(list(3,4,5))
val list1 = sc.parallelize(list(1,2,3))
val union = list.union(list1)
union.collect()
intersection() 求兩個rdd共同的元素的rdd
list.intersection(list1).collect()
subtract() 移除裡乙個rdd中的內容
list.subtract(list1).collect()
cartesian() 與另乙個rdd的笛卡兒積
list.cartesian(list1).collect()
2.行動操作
reduce()
val list = sc.parallelize(list(3,4,5))
list.reduce((x,y) => x + y)
collect() 返回rdd中的所有元素
count() rdd中的元素個數
countbyvalue() 各元素在rdd中出現的次數
take(num) 從rdd中返回num個數
top(num) rdd中返回最前面的num個元素
takeordered(num)(ordering) 從rdd中按照提供的舒徐返回最前見的num元素
reduce(func) 並行整合rdd中左右資料
fold(zero)(func) 和reduce一樣,但是需要提供初始值
aggregate(zerovalue)(seqop,combop) 和reduce相似,但是通常返回不同型別的函式
鍵值對操作:
建立pair rdd
使用第乙個單詞作為鍵建立出乙個pair rdd
val file=sc.textfile(「hdfs://hadoop1:9000/input/word/word.txt」)
file.map(x => (x.split(」 「)(0),x)).collect()
pair rdd的轉化操作
建立pair
val list1 = sc.parallelize(list((1,2),(3,4),(3,6)))
list1.collect()
reducebykey(func) 合併具有相同鍵的值
list1.reducebykey((x,y) => x+y).collect()
groupbykey() 對具有相同鍵的值進行分組
list1.groupbykey.collect()
mapvalues(func) 對pair rdd中的每個值應用乙個函式而不改變鍵
list1.mapvalues(x => x+1).collect()
flatmapvalues(func) 對pair rdd中的每個值應用乙個返回迭代器的函式,然後對返回的每個元素都生成乙個對應原鍵對記錄。通常用於符號化
list1.flatmapvalues(x => (x to 5)).collect()
keys() 返回乙個僅包含鍵的rdd
list1.keys.collect()
values() 返回乙個僅包含值得rdd
list1.values.collect()
sortbykey() 返回乙個根據鍵排序的rdd
list1.sortbykey().collect()
針對兩個pair rdd的轉化操作
val rdd = sc.parallelize(list((1,2),(3,4),(3,6)))
val other = sc.parallelize(list((1,2)))
subtractbykey 刪掉rdd中鍵與other中的鍵相同的元素
rdd.subtractbykey(other).collect()
join 對兩個rdd進行內連線
rdd.join(other).collect()
leftouterjoin() 對兩個rdd進行連線操作,確保第二個rdd的鍵必須存在(左外連線)
rdd.leftouterjoin(other).collect()
cogroup() 將兩個rdd中擁有相同鍵的資料分組到一起
rdd.cogroup(other).collect()
Spark入門RDD操作
rdd resilient distributed datasets 彈性分布式資料集,是spark中的抽象資料結構型別,任何資料在spark中都被表示為rdd.從程式設計的角度來看,rdd可以簡單看成是乙個陣列.和普通陣列的區別是,rdd中的資料是分割槽儲存的,這樣不同分割槽的資料就可以分布在不同...
Spark工作原理及RDD
1.基於記憶體 2.迭代式計算 3.分布式 基本工作原理 將spark的程式提交到spark集群上,在hadoop的hdfs或者hive上讀取資料,讀取的資料存放在各個spark的節點上,分布式的存放在多個節點上,主要在每個節點的記憶體上,這樣可以加快速度。對節點的資料進行處理,處理後的資料存放在其...
Spark學習 RDD程式設計基礎
spark上開發的應用程式都是由乙個driver programe構成,這個所謂的驅動程式在spark集群通過跑main函式來執行各種並行操作。集群上的所有節點進行平行計算需要共同訪問乙個分割槽元素的集合,這就是rdd rdd resilient distributed dataset 彈性分布式資...