spark上開發的應用程式都是由乙個driver programe構成,這個所謂的驅動程式在spark集群通過跑main函式來執行各種並行操作。集群上的所有節點進行平行計算需要共同訪問乙個分割槽元素的集合,這就是rdd(rdd resilient distributed dataset)彈性分布式資料集。rdd可以儲存在記憶體或磁碟中,具有一定的容錯性,可以在節點宕機重啟後恢復。
在spark 中,對資料的所有操作不外乎建立rdd、轉化已有rdd 以及調rdd 操作進行求值。而在這一切背後,spark 會自動將rdd 中的資料分發到集群上,並將操作並行化執行。
建立rdd有兩種方式:一種是通過並行化驅動程式中的已有集合建立,另外一種方法是讀取外部資料集。
一種非常簡單的建立rdd的方式,將程式中的乙個集合傳給sparkcontext
的parallelize()
方法。
python中的操作(pyspark開啟shell):
>>> data = [1, 2, 3, 4, 5]
>>> distdata = sc.parallelize(data)
# 對rdd進行測試操作
# 對集合中的所有元素進行相加,返回結果為15
>>> distdata.reduce(lambda a, b: a + b)
15
scala中的操作(spark-shell開啟shell):
scala> val data = array(1,2,3,4,5)
data: array[int] = array(1, 2, 3, 4, 5)
scala> val distdata = sc.parallelize(data)
distdata: org.apache
.spark
.rdd
.rdd[int] = parallelcollectionrdd[0] at parallelize at :23
scala> distdata.reduce((a,b)=>a+b)
res0: int = 15
spark可以從任何hadoop支援的儲存上建立rdd,比如本地的檔案系統,hdfs,cassandra等。spark可以支援文字檔案,sequencefiles等。
這種方法更為常用。
python:
# 從protocols檔案建立rdd
distfile = sc.textfile("/etc/protocols")
scala:
// 從protocols檔案建立rdd
val distfile = sc.textfile("/etc/protocols")
rdd支援兩種操作:轉換(transformations):將已存在的資料集轉換成新的資料集,例如map。轉換是惰性的,不會立刻計算結果,僅僅記錄轉換操作應用的目標資料集,當動作需要乙個結果時才計算。動作(actions) :資料集計算後返回乙個值給驅動程式,例如reduce
轉換操作是懶惰的,舉個例子:
>>> lines = sc.textfile("readme.md")
>>> pythonlines = lines.filter(lambda line: "python"
inline)
>>>> pythonlines.first()
如果spark 在我們執行lines = sc.textfile(…) 時就把檔案中所有的行都讀取並儲存起來,就會消耗很多儲存空間,而我們馬上就要篩選掉其中的很多資料。相反, 一旦spark了解了完整的轉化操作鏈之後,它就可以只計算求結果時真正需要的資料。事實上,在行動操作first() 中,spark 只需要掃瞄檔案直到找到第乙個匹配的行為止,而不需要讀取整個檔案。
下面引用《spark快速大資料分析》的一段話:
我們不應該把rdd 看作存 放著特定資料的資料集,而最好把每個rdd 當作我們通過轉化操作構建出來的、記錄如 何計算資料的指令列表。把資料讀取到rdd 的操作也同樣是惰性的。因此,當我們呼叫 sc.textfile() 時,資料並沒有讀取進來,而是在必要時才會讀取。和轉化操作一樣的是, 讀取資料的操作也有可能會多次執行。簡單的例子理解轉換和行動操作:python:
# 從protocols檔案建立rdd
distfile = sc.textfile("/etc/protocols")
# map操作,每行的長度,轉換操作
linelengths = distfile.map(lambda s: len(s))
# reduce操作,獲得所有行長度的和,即檔案總長度,這裡才會執行map運算
totallength = linelengths.reduce(lambda a, b: a + b)
# 可以將轉換後的rdd儲存到集群記憶體中
linelengths.persist()
scala:
// 從protocols檔案建立rdd
val distfile = sc.textfile("/etc/protocols")
// map操作,每行的長度
val linelengths = distfile.map(s => s.length)
// reduce操作,獲得所有行長度的和,即檔案總長度,這裡才會執行map運算
val totallength = linelengths.reduce((a, b) => a + b)
// 可以將轉換後的rdd儲存到集群記憶體中
linelengths.persist()
如下圖所示:
匿名函式:lambda的大量使用,讓簡單的函式寫成表示式的樣子。
模組裡的頂層函式
spark中已呼叫函式中定義的本地函式
# 定義需要傳遞給spark的函式
defmyfunc
(s):
words = s.split(" ")
return len(words)
# 將詞數統計函式傳遞給spark
sc.textfile("/etc/protocols").map(myfunc).reduce(lambda a,b:a+b)
上述中定義的函式為計算一行字串中單詞的個數,
sc.textfile("/etc/protocols").map(myfunc).reduce(lambda a,b:a+b)
表明先讀取內容轉出rdd,然後轉換map()是對每一行都進行統計操作,然後進行行動操作,用lambda匿名函式計算所有行的單詞和。
匿名函式:常用於短小的**片段。
全域性單例物件中的靜態函式:例如,你可以定義乙個object myfunctions,然後將myfunctions.func1傳入,就像下面這樣:
//建立乙個單例物件myfunctions
object myfunctions
}val linelengths = sc.textfile("/etc/protocols").map(myfunctions.func1)
val count = linelengths.reduce((a,b)=>a+b)
與python幾乎一樣,注意其匿名函式的寫法。
參考資料:
1. 《spark快速大資料分析》
2.
spark學習 RDD程式設計
rdd建立 從從檔案系統中載入資料建立rdd 1.spark採用textfile 從檔案系統中載入資料建立rdd 可以使本地,分布式系統等 2.把檔案的url作為引數 可以是本地檔案系統的位址,分布式檔案系統hdfs的位址等等 從本地檔案中載入資料 sc為系統自動建立的sparkcontext,不用...
Spark學習之RDD程式設計(一)
rdd是spark的核心概念,它是乙個可讀的 可分割槽的分布式資料集,這個資料集的全部或部分可以快取在記憶體中,可在多次計算間重用。spark用scala語言實現了rdd的api,我們可以通過呼叫api實現對rdd的各種操作,從而實現各種複雜的應用。spark採用textfile 方法從檔案系統中載...
Spark程式設計模型 RDD
spark程式設計模型是彈性分布式資料集 resilient distributed dataset,rdd 是mapreduce模型的擴充套件和延伸 基於rdd機制實現了多類模型計算,如 1.迭代計算 2.互動式sql查詢 3.mapreduce rdd 4.流式資料處理。markdown 是一種...