1、 rdd是最基本的資料抽象,它代表乙個不可變、可分割槽、裡面的元素可平行計算的集合,具有資料流模型的特點:自動容錯,位置感知性排程和可伸縮性,允許使用者在執行多個查詢時顯式的將工作集快取在記憶體中,後續的查詢能夠重用工作集,提公升了查詢速度。
2、rdd屬性:
3、rdd特性
1、transformation
transformation用於對rdd的建立,rdd只能使用transformation建立,同時還提供大量操作方法,包括map,filter,groupby,join等,rdd利用這些操作生成新的rdd,但是需要注意,無論多少次transformation,在rdd中真正資料計算action之前都不可能真正執行。
2、action
action是資料執行部分,其通過執行count,reduce,collect等方法真正執行資料的計算部分。實際上,rdd中所有的操作都是lazy模式進行,執行在編譯中不會立即計算最終結果,而是記住所有操作步驟和方法,只有顯示的遇到啟動命令才執行。這樣做的好處在於大部分前期工作在transformation時已經完成,當action工作時,只需要利用全部自由完成業務的核心工作。
3、下面是在python中對rdd的生成,以及一些基本的transformation,action操作。
from pyspark import sparkcontext, sparkconf
from pyspark.streaming import streamingcontext
import math
master= "local"#設定單機
sc = sparkcontext(conf=conf)
# parallelize:並行化資料,轉化為rdd
data = [1, 2, 3, 4, 5]
distdata = sc.parallelize(data, numslices=10) # numslices為分塊數目,根據集群數進行分塊
# textfile讀取外部資料
rdd = sc.textfile("./c2.txt") # 以行為單位讀取外部檔案,並轉化為rdd
print rdd.collect()
# map:迭代,對資料集中資料進行單獨操作
def my_add(l):
return (l,l)
data = [1, 2, 3, 4, 5]
distdata = sc.parallelize(data) # 並行化資料集
result = distdata.map(my_add)
print (result.collect()) # 返回乙個分布資料集
# filter:過濾資料
def my_add(l):
result = false
if l > 2:
result = true
return result
data = [1, 2, 3, 4, 5]
distdata = sc.parallelize(data)#並行化資料集,分片
result = distdata.filter(my_add)
print (result.collect())#返回乙個分布資料集
# zip:將兩個rdd對應元素組合為元組
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
print x.zip(y).collect()
#union 組合兩個rdd
print x.union(x).collect()
# aciton操作
# collect:返回rdd中的資料
rdd = sc.parallelize(range(1, 10))
print rdd
print rdd.collect()
# collectasmap:以rdd元素為元組,以元組中乙個元素作為索引返回rdd中的資料
m = sc.parallelize([('a', 2), (3, 4)]).collectasmap()
print m['a']
print m[3]
# groupby函式:根據提供的方法為rdd分組:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
def fun(i):
return i % 2
result = rdd.groupby(fun).collect()
print [(x, sorted(y)) for (x, y) in result]
# reduce:對資料集進行運算
rdd = sc.parallelize(range(1, 10))
result = rdd.reduce(lambda a, b: a + b)
print result
map() # 將函式應用於 rdd 中的每個元素,將返回值構成新的 rdd
rdd.map(x => x + 1)
flatmap() # 將函式應用於 rdd 中的每個元素,將返回的迭代器的所有內容構成新的 rdd。通常用來切
分單詞rdd.flatmap(x => x.to(3))
filter() # 返回乙個由通過傳給 filter()的函式的元素組成的 rdd
rdd.filter(x => x != 1)
distinct() # 去重 rdd.distinct()
sample(withre
placement, fra
ction, [seed])
union() # 生成乙個包含兩個 rdd 中所有元素的 rdd
rdd.union(other) #
intersection() # 求兩個 rdd 共同的元素的 rdd rdd.intersection(other)
subtract() # 移除乙個 rdd 中的內容(例如移除訓練資料)
rdd.subtract(other) #
cartesian() # 與另乙個 rdd 的笛卡兒積 rdd.cartesian(other)
collect() # 返回 rdd 中的所有元素 rdd.collect()
count() # rdd 中的元素個數 rdd.count() 4
countbyvalue() # 各元素在 rdd **現的次數 rdd.countbyvalue()
take(num) # 從 rdd 中返回 num 個元素 rdd.take(2)
top(num) # 從 rdd 中返回最前面的 num個元素
rdd.top(2) #
takeordered(num)
(ordering)
# 從 rdd 中按照提供的順序返回最前面的 num 個元素
rdd.takeordered(2)(myordering) #
takesample(withreplace
ment, num, [seed])
# 從 rdd 中返回任意一些元素 rdd.takesample(false, 1) 非確定的reduce(func) 並 行 整 合 rdd 中 所 有 數 據(例如 sum)
rdd.reduce((x, y) => x + y) 9
fold(zero)(func) # 和 reduce() 一 樣, 但 是 需 要提供初始值
rdd.fold(0)((x, y) => x + y) 9
aggregate(zerovalue)
(seqop, combop)
# 和 reduce() 相 似, 但 是 通 常返回不同型別的函式
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
(9,4)
foreach(func) # 對 rdd 中的每個元素使用給定的函式
rdd.foreach(func)
從測試人員的角度理解專案流程
這是一篇從測試人員角度介紹對於整個專案流程理解,主要重點介紹需求評審階段至產品上線後維護階段。在需求評審之前的階段,即產品設計階段,測試一般不參與,主要由產品經理基於市場調研及使用者需求,產出產品原型,後續由ui設計師輸出主要介面ui設計規範。1 ui設計文件及需求互動文件 ui稿明確業務實現細節,...
從開發人員角度看待效能基準測試
對乙個開發人員來說,除了保質保量按時完成功能需求外,非功能也不可忽視。決定乙個軟體的成敗往往是非功能性需求比如效能,若是使用者體驗不好那麼必定是個失敗的作品。那麼乙個開發人員如何去做關於自己模組又或者整體的基準效能測試呢?以下將從測試的切入點和具體測試的指標來說明。切入點 通常,基準效能測試有兩個切...
Spark學習進度 實戰測試
題目 該資料集包含了某大學計算機系的成績,資料格式如下所示 tom,database,80 tom,algorithm,50 tom,datastructure,60 jim,database,90 jim,algorithm,60 jim,datastructure,80 請根據給定的實驗資料,在...