五、spark資料核心--rdd
六、建立rdd
spark是基於記憶體的分布式計算框架,特點是快速、易用、通用及多種執行模式。
spark架構主要由以下元件構成:
cluster manager : 在集群(standalone、mesos、yarn)上獲取資源的外部服務
task: 被送到某個 executor上的工作單元
stage: 每個job會被拆分成多組 task,作為乙個 taskset,其名稱為 stage
執行架構:
spark執行流程:
spark 中master,worker,driver和executor之間的關係
在開發過程中,常用api主要有: sparkcontext、 sparksession、 rdd、 dataset及 dataframe,本文主要介紹 sparkcontext、 sparksession。
在idea中建立sparkcontext,**如下:
//導包
import org.apache.spark.
//建立乙個sparkcontext物件
val conf=
newsparkconf()
.setmaster
("local[2]").
("hellospark"
)val sc=sparkcontext.
getorcreate
(conf)
在idea中建立sparksession,**如下:
//導包
import org.apache.spark.sql.sparksession
//建立乙個sparksession物件
val spark = sparksession.builder
.master
("local[2]").
().getorcreate()
val sc: sparkcontext = spark.sparkcontext
rdd稱為 彈性分布式資料集( resilient distributed datasets),它是一種分布式的記憶體抽象,允許在大型集群上執行基於記憶體的計算( in memory computing),為使用者遮蔽了底層複雜的計算和對映環境。
彈性:指在任何時候都能進行重算,這樣當集群中的一台機器掛掉而導致儲存在其上的rdd丟失後,spark還可以重新計算出這部分的分割槽的資料
分布式:資料計算分布於多節點
資料集:rdd並不儲存真正的資料,只是對資料和操作的描述。它是唯讀的、分割槽記錄的集合,每個分割槽分布在集群的不同節點上
簡單來說,rdd是將資料項拆分為多個分割槽的集合,儲存在集群的工作節點上的記憶體和磁碟中,並執行正確的操作 。
更規範的解釋是:
每個分割槽上都有compute函式,計算該分割槽中的資料
rdd有依賴性,通常情況下乙個 rdd是**於另乙個 rdd,這個叫做lineage。rdd會記錄下這些依賴,方便容錯。也稱dag。
只有key-value型別的 rdd才有分割槽器 ,可以傳遞乙個自定義的partitioner進行重新分割槽,非key-value型別的 rdd(pairrdd)分割槽器的值是 none。
不同的 rdd的compute函式邏輯各不一樣,比如:
hadooprdd的compute是讀取指定partition資料。因為sc.hadoopfile(「path」)」讀取 hdfs檔案返回的rdd具體型別便是 hadooprdd,所以只需要讀取資料即可。
checkpointrdd的compute是直接讀取檢查點的資料。一旦 rdd進行checkpoint,將變成checkpointrdd
該列表儲存了訪問每個分割槽的優先位置 。對於乙個 hdfs檔案來說,這個列表儲存了每個分割槽所在的資料塊的位置。按照「移動資料不如移動計算的」的理念, spark在進行任務排程的時候,會盡可能的將計算任務移動到所要處理的資料塊的儲存位置。
通過集合建立rdd有兩種方法:parallelize與makerdd
makerdd多乙個過載方法:過載分配一系列本地scala集合形成乙個rdd,可以為每個集合物件建立乙個分割槽,並指定優先位置便於在執行中優化排程。
使用本地集合建立rdd的問題在於:由於這種方法需要用到一台機器中集合的全部資料,所以這種方式在測試和原型構造之外很少使用,一般在測試時使用
使用parallelize建立rdd:
import org.apache.spark.rdd.rdd
import org.apache.spark.
object sparkcontextdemo extends
import org.apache.spark.sparkcontext
import org.apache.spark.sql.sparksession
object sparksessiondemo extends
注:
sc.
textfile
("/my/directory"
)sc.
textfile
("/my/directory/*.txt"
)sc.
textfile
("/my/directory/*.gz"
)
sparkcontext.wholetextfiles():可以針對乙個目錄中的大量小檔案返回作為pairrdd。spark 為包含鍵值對型別的 rdd 提供了一些專有的操作,比如:reducebykey()、groupbykey()……
也可以通過鍵值對集合建立pairrdd:sc.parallelize(list((1,2),(1,3)))
示例:idea src的data目錄中有兩個檔案:hello.txt、test01.txt
內容如下:
建立乙個pairpdd讀取資料:
import org.apache.spark.rdd.rdd
import org.apache.spark.
object creatrdddemo extends
列印結果如下圖:
Spark架構及原理
原則一 避免建立重複的rdd 原則二 盡可能用同乙個rdd 原則三 對多次使用的rdd進行持久化 如何選擇一種最合適的持久化策略 原則四 盡量避免使用shuffle類運算元 原則五 使用map side預聚合的shuffle操作 groupbykey 和 reducebykey 原則六 使用高效能的...
Spark基礎概念01 初識Spark架構和RDD
四 核心api 五 rdd是什麼,有哪些特點 六 rdd的特性 七 rdd常用的建立方式 八 rdd常用的運算元 轉換 動作 九 基於rdd的應用程式開發 十 shuffle機制 十一 累加器 可自定義 1 在驅動程式中,通過sparkcontext主導應用的執行 2 sparkcontext可以連...
spark簡述,安裝
hadoop mapreduce框架 平行計算的思想 分而治之的思想 scala集合高階函式 處理資料的思想 將 要分析的資料放到集合中去,然後呼叫集合的高階函式處理資料 統一分析引擎為海量資料處理 統一 什麼樣的資料都能處理分析,什麼型別的資料都可以處理,實時,離線,流式都可以 mapreduce...