spark 學習 二 RDD及共享變數

2021-09-07 13:50:12 字數 2961 閱讀 9671

宣告:本文基於spark的programming guide,並融合自己的相關理解整理而成

spark應用程式總是包括著乙個driver program(驅動程式),它執行著使用者的main方法,而且執行大量的並行操作(parallel operations)在集群上.

spark最基本的抽象就是rdd(resilient distributed dataset) 彈性分布式資料集,rdd  就是切割元素的集合,他被分發在集群的各個節點上,而且可以進行並行操作.

rdd的建立有三種方式:

rdd可以持久化到記憶體中以反覆使用加速計算速度, rdd可以自己主動從失敗的節點中恢復(血統設計).

spark中的還有乙個抽象就是可以被用於平行計算的共享變數. 預設的情況下, spark並行執行乙個函式是作為一組tasks在不同的節點上同一時候計算的, 這樣的情況下,他是通過分發每個變數的複製到每個task中的. 有時候,我們須要某些變數在tasks之間進行共享. 這裡spark支援兩種共享變數:

broadcast variables, 被用於持久化變數在每乙個node的記憶體中;

accumulators,  這個變數僅僅可以被累加,

操作主要包含兩種,各自是transformations 和 action

transformation : 將乙個已經存在的rdd中轉換成乙個新的rdd,全部的轉換操作都是lazy執行的,即僅僅是記下了執行的動作,僅僅有當driver程式須要結果的時候才會進行計算. 

action:一般用於對rdd中的元素進行實際的計算,然後返回對應的值,比如reduce操作,collect操作,count操作等等.這中action之後返回的就不在是rdd了

rdd基本操作的幾個樣例以及自己的理解:

//spark://host:port

val sc = new sparkcontext(conf)

/*** parallelized collections

* 將scala的集合資料,並行化成為能夠平行計算的分布式資料集

*/val data = 1 to 1000 toarray

val distdata = sc.parallelize(data,10)

//後面的數字是表示將集合切分成多少個塊 ,一般是乙個cpu 2-4塊,通常spark能夠自己主動幫你切分

val sum = distdata.reduce((a, b) => a+b )

//在reduce的時候才開始真正的執行,driver將任務分布到各個機器上,然後每乙個機器單獨執行,將計算的結果返回到driver程式

println("sum " + sum)

/*** 讀取外部的資料來源

* 1.hadoop支援的資料來源 ,比如hdfs,cassandra,hbase ,amazon s3

* ##假設檔案位址是本地位址的話,那麼他應該在集群的每乙個節點上都能夠被訪問(即:每乙個節點上都應該有相同的檔案)

* ##textfile的第二個引數控制檔案被分割的大小默覺得64mb ,能夠設定更大的可是不能設定更小的

*/val distfile = sc.textfile("file:///usr/local/spark/readme.md")

//接下來就能夠進行相關的操作了

distfile.persist()//持久化

val len = distfile.map(s => 1).reduce((a, b) => a+b)

println(len)

val words = distfile.flatmap(l => l.split(" ")).map(w => (w,1)).reducebykey((a,b) => a+b)

//w => (v1+v2+v3+...)

//map => 1->1 , flatmap => 1 -> 0..n

print(words.count())

words foreach println

val twords = distfile.flatmap(l => l.split(" ")).map(w => (w,1)).groupbykey()

//分組 w => (v1, v2, v3 ...)

twords foreach println

//.map(w => (w,1)).foreach(w => w._1);

使用方法: 使用persist()或者cache()方法,當中cache()方法預設持久化到記憶體,persist能夠自己選擇持久化的層次,在shuffle操作中,spark會自己主動儲存中間計算結果,比如reducebykey

作用:  rdd的持久化會將會使得每乙個節點儲存對應的計算部分,以便再次使用該資料集時能夠直接使用,加快計算速度

怎樣選擇持久化層次: 假設rdds 在memory_only下表現良好的話,就選這個層次,這樣cpu效率最高

其次memory_only_ser ,其它情況

1. broadcast 變數, 僅僅讀的共享變數 每乙個節點上都有乙個拷貝, 使用方法

val broadcastvar = sc.broadcast("string test")

broadcastvar

.value

2.accumulator 變數,做累加器用,類似與counter或者是sum

val broadcastvar = sc.broadcast("string test")//broadcast variable is readonly

val v = broadcastvar.value

println(v)

val accum = sc.accumulator(0, "my accumulator")//value and name

sc.parallelize(1 to 1000000).foreach(x => accum+= 1)

println(accum.name + ":" + accum.value)

spark學習(二)RDD和DAG

rdd resilient distributed dataset 叫做彈性分布式資料集,是spark中最基本也是最重要的概念之一。它是spark中一種基本的資料抽象,有容錯機制並可以被並行操作的元素集合,具有唯讀 分割槽 容錯 高效 無需物化 可以快取 rdd依賴等特徵。rdd的知識較為龐雜,這裡...

spark學習 RDD程式設計

rdd建立 從從檔案系統中載入資料建立rdd 1.spark採用textfile 從檔案系統中載入資料建立rdd 可以使本地,分布式系統等 2.把檔案的url作為引數 可以是本地檔案系統的位址,分布式檔案系統hdfs的位址等等 從本地檔案中載入資料 sc為系統自動建立的sparkcontext,不用...

Spark工作原理及RDD

1.基於記憶體 2.迭代式計算 3.分布式 基本工作原理 將spark的程式提交到spark集群上,在hadoop的hdfs或者hive上讀取資料,讀取的資料存放在各個spark的節點上,分布式的存放在多個節點上,主要在每個節點的記憶體上,這樣可以加快速度。對節點的資料進行處理,處理後的資料存放在其...