資料分割槽:
在分布式集群裡,網路通訊的代價很大,減少網路傳輸可以極大提公升效能。
mapreduce框架的效能開支主要在io和網路傳輸,io因為要大量讀寫檔案,它是不可避免的,但是網路傳輸是可以避免的,把大檔案壓縮變小檔案,從而減少網路傳輸,但是增加了cpu的計算負載。
spark裡面io也是不可避免的,但是網路傳輸spark裡面進行了優化:
spark把rdd進行分割槽(分片),放在集群上平行計算。
同乙個rdd分片100個,10個節點,平均乙個節點10個分割槽
當進行sum型的計算的時候,先進行每個分割槽的sum,然後把sum值shuffle傳輸到主程式進行全域性sum,所以進行sum型計算對網路傳輸非常小。
但對於進行join型的計算的時候,需要把資料本身進行shuffle,網路開銷很大。
spark是如何優化這個問題的呢?
spark把key-value rdd通過key的hashcode進行分割槽,而且保證相同的key儲存在同乙個節點上,這樣對改rdd進行key聚合時,就不需要shuffle過程
我們進行mapreduce計算的時候為什麼要盡興shuffle?,就是說mapreduce裡面網路傳輸主要在shuffle階段,shuffle的根本原因是相同的key存在不同的節點上,按key進行聚合的時候不得不進行shuffle。shuffle是非常影響網路的,它要把所有的資料混在一起走網路,然後它才能把相同的key走到一起。要盡興shuffle是儲存決定的。
spark從這個教訓中得到啟發,spark會把key進行分割槽,也就是key的hashcode進行分割槽,相同的key,hashcode肯定是一樣的,所以它進行分割槽的時候100t的資料分成10分,每部分10個t,它能確保相同的key肯定在乙個分割槽裡面,而且它能保證儲存的時候相同的key能夠存在同乙個節點上。
比如乙個rdd分成了100份,集群有10個節點,所以每個節點存10份,每一分稱為每個分割槽,spark能保證相同的key存在同乙個節點上,實際上相同的key存在同乙個分割槽。
key的分布不均決定了有的分割槽大有的分割槽小。沒法分割槽保證完全相等,但它會保證在乙個接近的範圍。
所以mapreduce裡面做的某些工作裡邊,spark就不需要shuffle了,spark解決網路傳輸這塊的根本原理就是這個。
進行join的時候是兩個表,不可能把兩個表都分割槽好,通常情況下是把用的頻繁的大表事先進行分割槽,小表進行關聯它的時候小表進行shuffle過程。
大表不需要shuffle。
模版是:
val userdata = sc.sequencefile[userid,userinfo]("hdfs://...")
.partitionby(new hashpartition(100))//構造100個分割槽
.persist()
從分割槽中獲益的操作:cogroup(), groupwith(),join(),leftouterjoin(),rightouterjoin(),groupbykey(),reducebykey(),cobimebykey(),lookup()
所有基於key的操作都會獲益
對於諸如cogroup()和join()這樣的二元操作,預先進行資料分割槽會讓其中至少乙個rdd(使用已知分割槽器的那個rdd)不發生資料shuffle,如果兩個rdd使用同樣的分割槽方式,並且它們還快取在同樣的機器上(比如乙個rdd是通過mapvalues()從另乙個rdd中建立出來的,這兩個rdd就會擁有相同的key和分割槽方式),或者其中rdd還沒有被計算出來,那麼跨界點的shuffle(資料混洗)不會發生了。
mapreduce一般要求本地網絡卡達到20兆!即便進行了壓縮!
**:
importorg.apache.hadoop.hive.ql.exec.persistence.hybridhashtablecontainer.hashpartitionimportorg.apache.hadoop.mapred.lib
importorg.apache.spark.sparkconf
importorg.apache.spark.sql.sparksession
importorg.apache.spark.storage.storagelevel
importorg.apache.spark.hashpartitioner
/*** created by zengxiaosen on 16/9/23.
*/objectpartitionvisitcount .partitionby(newhashpartitioner(10)) //採用了hashcode分片方式,分成了10份,十個分割槽,每個分割槽10分
/*相同的key在同乙個分割槽,在進行任務呼叫時候,大表不需要任何shuffle
只需要shuffle小表
*/.persist(storagelevel.disk_only
) /*
parallelize有兩個引數,第乙個是他的list,第二個是分割槽數
分割槽數可以不給,不給的情況下預設就是它的核數
*///比如裡面存的是我的使用者id
valrdd = sc.parallelize(list(1,
2,3,
4,5,
6,7),10)
.map(i => (i+""
, i+"str"))
filerdd.join(rdd).foreach(println)
/*如果filerdd後面還會關聯好多個其他的rdd1,rdd2。。。rddn
就要先把大的filerdd進行分割槽
這樣優化了網路傳輸*/}
}
spark更改分割槽 Spark中的分割槽方法詳解
一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...
spark 的RDD分割槽
rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...
spark關於分割槽和sortBy的學習
首次學習spark時,對分割槽沒有直觀的了解,在使用sortby方式時也不能得預期的結果,通過實踐了解spark分割槽和sortby的原理 val sc new sparkcontext conf master設定為 local 4 利用4個執行緒 executor 來測試,模擬分布式環境 val ...