Spark學習之RDD程式設計(一)

2021-09-02 13:50:36 字數 4308 閱讀 1332

rdd是spark的核心概念,它是乙個可讀的、可分割槽的分布式資料集,這個資料集的全部或部分可以快取在記憶體中,可在多次計算間重用。spark用scala語言實現了rdd的api,我們可以通過呼叫api實現對rdd的各種操作,從而實現各種複雜的應用。

spark採用textfile()方法從檔案系統中載入資料建立rdd,該方法把檔案的uri作為引數,這個uri可以是本地檔案系統的位址、分布式檔案系統hdfs的位址

1.從本地檔案系統中載入資料

在spark--shell互動環境中,執行如下命令:

其中,「lines:org.apache.spark.rdd.rdd[string]......"是命令執行返回的資訊,從中可以看出,執行sc.textfile()方法以後,spark從本地檔案word.txt中載入資料到記憶體,在記憶體中生成乙個rdd物件lines,lines是org.apache.spark.rdd.rdd這個類的乙個例項,這個rdd裡面包含了若干個元素,每個元素的型別是string型別,也就是說,從word.txt檔案中讀取出來的每一行文字內容,都成為rdd中的每乙個元素,如果word.txt中包含了1000行,那麼,lines這個rdd中就會包含1000個string型別的元素。

2.從分布式檔案系統hdfs中載入資料

從hdfs中載入資料的命令如下:

3.通過並行集合(陣列)建立rdd

可以呼叫sparkcontext的parallelize方法,從乙個已經存在的集合(陣列)上建立rdd,命令如下:

rdd操作包括兩種型別,即轉換(transformation)操作和行動(action操作)。

對於rdd而言,每一次轉換操作都會產生不同的rdd,供給下乙個操作使用。rdd的轉換過程是惰性求值的,也就是說,整個轉換過程只是記錄了轉換的軌跡,並不會傳送真正的計算,只有遇到行動操作時,才會觸發"從頭到尾」真正的計算。下表給出了常用的rdd轉換操作api,其中很多都是高階函式,比如,filter(func)就是乙個高階函式,這個函式的輸入引數func也是乙個函式。

操作含義

filter(func)

篩選出滿足func的元素,並返回乙個新的資料集

map(func)

將每個元素傳遞到函式func中,並將結果返回為乙個新的資料集

flatmap(func)

與map()相似,但每個輸入元素都可以對映到0或多個輸出結果

groupbykey()

應用於(k,v)鍵值對的資料集時,返回乙個新的(k,iterable)形式的資料集

reducebykey(func)

應用於(k,v)鍵值對的資料集時,返回乙個新的(k,v)形式的資料集,其中每個值是將每個key傳遞到函式func中進行聚合後的結果

filter(func)操作會篩選處滿足函式func的元素,並返回乙個新的資料集。例如:

執行lines.filter()操作,filter()的輸入引數line=>line.contains("spark")是乙個匿名函式。line.filter(line=>line.contains("spark「))操作的含義是,依次取出lines這個rdd中的每乙個元素,對於當前取到的元素,把它賦值給匿名函式式中的line變數,然後,執行匿名函式體部分line.contains("spark"),如果line中包含"spark"這個單詞,就把這個元素加入到新的rdd(即lineswithspark)中,否則,就丟棄該元素。最終,新生成的rdd中的所有元素,都包含了單詞"spark"。

map(func)操作將每個元素傳遞到函式func中,並將結果返回乙個新的資料集。

第一行語句建立了乙個包含5個int型別元素的陣列array。第2行語句執行sc.parallelize(0,從陣列array中生成乙個rdd,即rdd1,rdd1中包含5個int型別的元素,即1、2、3、4、5。第三行語句執行rdd1.map()操作,map()的輸入引數"x=>x+10"是乙個匿名函式。依此取出rdd1中的每個元素,分別+10,作為函式的返回值,並作為乙個元素放入到新的rdd(即rdd2)中。

flatmap(func)與map()相似,但每個輸入元素都可以對映到0或多個輸出結果。例如

第一步:map()。執行lines.map(line=>line.split(" ")操作,從lines轉換得到乙個新的rdd(即wordarray),wordarray中的每個元素都是乙個陣列物件。例如,第1個元素是array("hadoop","is","good"),第2個元素是array("spark","is","fast"),第三個元素是array("sark","is","better")。

第二步:拍扁(flat)。flatmap()操作中的flat是乙個很形象的動作——」拍扁",也就是把wordarray中的每個rdd元素都拍扁成多個元素,最終,所有這些被拍扁以後得到的元素,構成乙個新的rdd,即words。

groupbykey()應用於(k,v)鍵值對的資料集時,返回乙個新的(k,iterable)形式的資料集。

如上所示l("is",1)("is",1)這2個鍵值對的key相同,歸併為乙個新的鍵值對("is",(1,1))

reducebykey(func)應用於(k,v)鍵值對的資料集時,返回乙個新的(k,v)形式的資料集,其中的每個值是將每個key傳遞到函式func中進行聚合後得到的結果。

所有key相同的鍵值對,它們的value首先被歸併到一起,然後使用func函式把(1,1)聚合在一起。

行動操作是真正觸發計算的地方。spark程式只有執行到行動操作時,才會執行真正的計算,從檔案中載入資料,完成一次又一次轉換操作,最終,完成行動操作得到結果。下表列出了常用的rdd行動操作api。

操作含義

count()

返回資料集中的元素個數

collect()

以陣列的形式返回資料集中的所有元素

first()

返回資料集中的第乙個元素

take(n)

以陣列的形式返回資料集中的前n個元素

reduce(func)

通過函式func(輸入兩個引數並返回乙個值)聚合資料集中的元素

foreach(func)

將資料集中的每個元素傳遞到函式func中執行

當採用local模式在單擊上執行時,rdd.foreach(println)語句會列印出乙個rdd中的所有元素。但是,當採用集群模式執行時,在worker節點上執行列印語句到worker不會現實列印語句的這些輸出內容的。

所謂的「惰性機制」是指,整個轉換過程只是記錄了轉換的軌跡,並不會傳送真正的計算,只有遇到行動操作時,才會觸發「從頭到尾:的真正計算。

在spark中,rdd採用惰性求值的機制,每次遇到行動操作,都會從頭開始計算。每次遇到行動操作,都會觸發一次從頭開始的計算,對於迭代計算而言,代價是很大的,因為迭代計算經常需要多次重複使用同一組資料。下面就是多次計算同乙個rdd的例子:

實際上,可以通過持久化(快取)機制來避免這種重複計算的開銷。具體方法是使用persist()方法對乙個rdd標記尾持久化,之所以說」標記為持久化「,是因為出現persist()語句的地方,並不會馬上計算生成rdd並把它持久化,而是要等到遇到第乙個行動操作觸發真正計算以後,才會把計算結果進行持久化,持久化後的rdd將會被保留在計算節點的記憶體中,被後面的行動操作重複使用。

persist()的圓括號中包含的是持久化級別引數,可以有如下不同的級別:

持久化rdd會占用記憶體空間,當不再需要乙個rdd時,就可以使用uppersist()方法手動地把持久化的rdd從快取中移除,釋放空間記憶體。

spark學習 RDD程式設計

rdd建立 從從檔案系統中載入資料建立rdd 1.spark採用textfile 從檔案系統中載入資料建立rdd 可以使本地,分布式系統等 2.把檔案的url作為引數 可以是本地檔案系統的位址,分布式檔案系統hdfs的位址等等 從本地檔案中載入資料 sc為系統自動建立的sparkcontext,不用...

Spark學習 RDD程式設計基礎

spark上開發的應用程式都是由乙個driver programe構成,這個所謂的驅動程式在spark集群通過跑main函式來執行各種並行操作。集群上的所有節點進行平行計算需要共同訪問乙個分割槽元素的集合,這就是rdd rdd resilient distributed dataset 彈性分布式資...

Spark程式設計模型 RDD

spark程式設計模型是彈性分布式資料集 resilient distributed dataset,rdd 是mapreduce模型的擴充套件和延伸 基於rdd機制實現了多類模型計算,如 1.迭代計算 2.互動式sql查詢 3.mapreduce rdd 4.流式資料處理。markdown 是一種...