spark的優化 控制資料分割槽和分布

2021-07-23 01:57:48 字數 2880 閱讀 1114

資料分割槽:

在分布式集群裡,網路通訊的代價很大,減少網路傳輸可以極大提公升效能。

mapreduce框架的效能開支主要在io和網路傳輸,io因為要大量讀寫檔案,它是不可避免的,但是網路傳輸是可以避免的,把大檔案壓縮變小檔案,從而減少網路傳輸,但是增加了cpu的計算負載。

spark裡面io也是不可避免的,但是網路傳輸spark裡面進行了優化:

spark把rdd進行分割槽(分片),放在集群上平行計算。

同乙個rdd分片100個,10個節點,平均乙個節點10個分割槽

當進行sum型的計算的時候,先進行每個分割槽的sum,然後把sum值shuffle傳輸到主程式進行全域性sum,所以進行sum型計算對網路傳輸非常小。

但對於進行join型的計算的時候,需要把資料本身進行shuffle,網路開銷很大。

spark是如何優化這個問題的呢?

spark把key-value rdd通過key的hashcode進行分割槽,而且保證相同的key儲存在同乙個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程

我們進行mapreduce計算的時候為什麼要盡興shuffle?,就是說mapreduce裡面網路傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網路的,它要把所有的資料混在一起走網路,然後它才能把相同的key走到一起。要盡興shuffle是儲存決定的。

spark從這個教訓中得到啟發,spark會把key進行分割槽,也就是key的hashcode進行分割槽,相同的key,hashcode肯定是一樣的,所以它進行分割槽的時候100t的資料分成10分,每部分10個t,它能確保相同的key肯定在乙個分割槽裡面,而且它能保證儲存的時候相同的key能夠存在同乙個節點上。

比如乙個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一分稱為每個分割槽,spark能保證相同的key存在同乙個節點上,實際上相同的key存在同乙個分割槽。

key的分布不均決定了有的分割槽大有的分割槽小。沒法分割槽保證完全相等,但它會保證在乙個接近的範圍。

所以mapreduce裡面做的某些工作裡邊,spark就不需要shuffle了,spark解決網路傳輸這塊的根本原理就是這個。

進行join的時候是兩個表,不可能把兩個表都分割槽好,通常情況下是把用的頻繁的大表事先進行分割槽,小表進行關聯它的時候小表進行shuffle過程。

大表不需要shuffle。

模版是:

val userdata = sc.sequencefile[userid,userinfo]("hdfs://...")

.partitionby(new hashpartition(100))//構造100個分割槽

.persist()

從分割槽中獲益的操作:cogroup(), groupwith(),join(),leftouterjoin(),rightouterjoin(),groupbykey(),reducebykey(),cobimebykey(),lookup()

所有基於key的操作都會獲益

對於諸如cogroup()和join()這樣的二元操作,預先進行資料分割槽會讓其中至少乙個rdd(使用已知分割槽器的那個rdd)不發生資料shuffle,如果兩個rdd使用同樣的分割槽方式,並且它們還快取在同樣的機器上(比如乙個rdd是通過mapvalues()從另乙個rdd中建立出來的,這兩個rdd就會擁有相同的key和分割槽方式),或者其中rdd還沒有被計算出來,那麼跨界點的shuffle(資料混洗)不會發生了。

mapreduce一般要求本地網絡卡達到20兆!即便進行了壓縮!

**:

importorg.apache.hadoop.hive.ql.exec.persistence.hybridhashtablecontainer.hashpartition

importorg.apache.hadoop.mapred.lib

importorg.apache.spark.sparkconf

importorg.apache.spark.sql.sparksession

importorg.apache.spark.storage.storagelevel

importorg.apache.spark.hashpartitioner

/*** created by zengxiaosen on 16/9/23.

*/objectpartitionvisitcount .partitionby(newhashpartitioner(10)) //採用了hashcode分片方式,分成了10份,十個分割槽,每個分割槽10分

/*相同的key在同乙個分割槽,在進行任務呼叫時候,大表不需要任何shuffle

只需要shuffle小表

*/.persist(storagelevel.disk_only

) /*

parallelize有兩個引數,第乙個是他的list,第二個是分割槽數

分割槽數可以不給,不給的情況下預設就是它的核數

*///比如裡面存的是我的使用者id

valrdd = sc.parallelize(list(1,

2,3,

4,5,

6,7),10)

.map(i => (i+""

, i+"str"))

filerdd.join(rdd).foreach(println)

/*如果filerdd後面還會關聯好多個其他的rdd1,rdd2。。。rddn

就要先把大的filerdd進行分割槽

這樣優化了網路傳輸*/}

}

spark更改分割槽 Spark中的分割槽方法詳解

一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...

spark關於分割槽和sortBy的學習

首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理 val sc new sparkcontext conf master設定為 local 4 利用4個執行緒 executor 來測試,模擬分布式環境 val ...