Spark Core Spark核心部分

2021-10-10 05:55:00 字數 4577 閱讀 6589

spark核心部分總結

rdd resilient distributed dataset 彈性分布式資料集

rdd由一組分割槽組成,每乙個block塊對應乙個分割槽

函式實際上是作用在每乙個分割槽上,每乙個分割槽都會有乙個task來處理

rdd之間存在依賴關係

寬依賴

窄依賴

分割槽的運算元必須作用在key-value格式的rdd上

spark為task的計算提供了最佳計算位置,會盡量將task傳送到資料所在的節點執行,移動計算而不是移動資料

taskscheduler知道所有資料的位置

task由taskscheduler傳送

轉換運算元

操作運算元

轉換運算元是懶執行模式,即存在於邏輯層面,實際上並沒有做任何操作,只有在出現操作運算元時,才會將**執行。

persist方法

val rdd1: rdd[

string

]= sc.textfile(

"spark/data/students.txt"

)val sturdd: rdd[

string

]= rdd1.map(line =>

)//嘗試不加後上面的map出現幾次,相當於 sturdd.persist(storagelevel.memory_only)

sturdd.cache(

)//快取到記憶體,記憶體足夠為第一選擇

// sturdd.persist(storagelevel.memory_only)

//// 第二選擇,記憶體加壓縮資料

// sturdd.persist(storagelevel.memory_only_ser)

//// 第三選擇,記憶體加壓縮資料加磁碟

// sturdd.persist(storagelevel.memory_and_disk_ser)

//統計班級人數

sturdd

.map(line =>

(line.split(

",")(4

),1)

).reducebykey(_+_)

.foreach(println)

//統計性別人數

sturdd

.map(line =>

(line.split(

",")(3

),1)

).reducebykey(_+_)

.foreach(println)

1.當第乙個job執行完成之後,會向前回溯,如果有rdd做了checkpoint,會打上乙個標記

2. 重新啟動乙個job任務計算rdd 的資料,將rdd的資料儲存到hdfs

在checkpoint之前可以先cache一下

//設定checkpoint位置

sc.setcheckpointdir(

"spark/data/checkpoint"

)//讀取分數表

val studentrdd: rdd[

string

]= sc.textfile(

"spark/data/students.txt"

)val sturdd: rdd[

string

]= studentrdd.map(line =>

)/**

* checkpoint : 快照, 將rdd的資料持久化到hdfs, 資料不會丟失

*///優化

sturdd.cache(

) sturdd.checkpoint(

)// 統計i班級人數

sturdd

.map(line =>

(line.split(

",")(4

),1)

).reducebykey(_ + _)

.foreach(println)

// 統計性別人數

sturdd

.map(line =>

(line.split(

",")(3

),1)

).reducebykey(_ + _)

.foreach(println)

sturdd

.map(line =>

(line.split(

",")(3

),1)

).reducebykey(_ + _)

.foreach(println)

使用廣播變數在driver端定義乙個廣播變數

在運算元中如果使用到廣播變數,先去executor中獲取

如果executor中沒有這個廣播變數,executor會去driver端獲取廣播變數

後續的task就可以直接使用

val student: rdd[

string

]= sc.textfile(

"students.txt"

)val list: list[

string

]= list(

"文科一班"

,"文科二班"

,"理科一班"

)//將集合廣播

val broadlist: broadcast[list[

string]]

= sc.broadcast(list)

val result: rdd[

string

]= student.filter(line =>

) result.foreach(println)

在driver定義乙個累加器

在executor端進行累加

在driver端讀取累加結果

//定義累加器,預設值是0

val acc: longaccumulator = sc.longaccumulator

rdd.foreach(i =>

) println(acc.value)

//不使用累加器

var a:

int=

0 rdd.foreach(i =>

) println(a)

rdd快取的資料

廣播變數和累加器

shuffle檔案

hashshuffle 小檔案數 = map數 * reduce數

hashshufflemanager 小檔案數 = core數 * reduce數

sortshuffle(預設) 小檔案數 = 2 * map數

sortshuffle bypass機制,在溢寫的過程中不排序,當reduce的數量小於200的時候會觸發

driver在本地啟動

向rm申請啟動am

am啟動後向rm申請資源

rm分配資源啟動executor

executor啟動之後反向註冊給driver

driver和am合二為一,功能合併

am啟動後向rm申請資源

rm分配資源啟動executor

executor啟動之後反向註冊給driver

當**中遇到action運算元的時候開始任務排程

構建dag有向無環圖

dagscheduler根據寬窄依賴切分stage

dagscheduler將stage以taskset的形式傳送給taskscheduler

taskscheduler將task傳送到executor中執行(會盡量將task傳送到資料所在的節點執行)

executor向taskscheduler匯報任務執**況

8核 6核 4核 雙核CPU是什麼意思

對於初學者來說,cpu是什麼 什麼是雙核 4核 6核 8核等。下面,就以上的問題,我們做出一一解答。概念 cpu是什麼 做什麼用 一般cpu是接在 的,我們先來看看cpu是什麼,cpu既 處理器 電腦中乙個最重要,最核心的東西,市面上能買到的cpu只有兩種,一種是intel公司生產的,另一種是amd...

核模型(核密度估計)

note 是在基函式設計的時候使用到樣本,那麼訓練的是什麼?下面看公式。核模型,是以使用被稱為 核模型的 二元函式 k 的線性結合方式加以定義的。上面的theta就是我們要學習的物件 注意 theta為乙個向量,可以表述為下面的形式。可以把公式中的xi看做標記點 landmark 並將樣本x和標記點...

AMD3核開4核成功

現在amd流行開核 趁著心情鬱悶 偶也嘗試一下。主機板是技嘉ma770 us3,處理器是amd x3 720。偶買的主機板雖然是rev.2.0版本的,但是預設沒有ec fireware的選項,上網down了個bios公升級檔案 已經上傳附件了 用技嘉光碟帶的 bios程式載入安裝,一分鐘左右的時間就...