一、spark資料分割槽方式簡要
在spark中,rdd(resilient distributed dataset)是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣列,其中partition是陣列中的每個元素,並且這些元素分布在多台機器中。圖一中,rdd1包含了5個partition,rdd2包含了3個partition,這些partition分布在4個節點中。
spark包含兩種資料分割槽方式:hashpartitioner(雜湊分割槽)和rangepartitioner(範圍分割槽)。一般而言,對於初始讀入的資料是不具有任何的資料分割槽方式的。資料分割槽方式只作用於形式的資料。因此,當乙個job包含shuffle操作型別的運算元時,如groupbykey,reducebykey etc,此時就會使用資料分割槽方式來對資料進行分割槽,即確定某乙個key對應的鍵值對資料分配到哪乙個partition中。在spark shuffle階段中,共分為shuffle write階段和shuffle read階段,其中在shuffle write階段中,shuffle map task對資料進行處理產生中間資料,然後再根據資料分割槽方式對中間資料進行分割槽。最終shffle read階段中的shuffle read task會拉取shuffle write階段中產生的並已經分好區的中間資料。圖2中描述了shuffle階段與partition關係。下面則分別介紹spark中存在的兩種資料分割槽方式。
二、hashpartitioner(雜湊分割槽)
1、hashpartitioner原理簡介
hashpartitioner採用雜湊的方式對鍵值對資料進行分割槽。其資料分割槽規則為 partitionid = key.hashcode % numpartitions,其中partitionid代表該key對應的鍵值對資料應當分配到的partition標識,key.hashcode表示該key的雜湊值,numpartitions表示包含的partition個數。圖3簡單描述了hashpartitioner的資料分割槽過程。
2、hashpartitioner原始碼詳解
hashpartitioner原始碼較為簡單,這裡不再進行詳細解釋。
classhashpartitioner(partitions: int) extends partitioner override def equals(other: any): boolean =other match override def hashcode: int =numpartitions
def nonnegativemod(x: int, mod: int): int=.collect()
val numitems=sketched.map(_._2).sum
(numitems, sketched)
④ 資料抽樣完成後,需要對不均衡的partition重新進行抽樣,預設當partition中包含的資料量大於平均值的三倍時,該partition是不均衡的。當取樣完成後,利用樣本容量和rdd中包含的資料總量,可以得到整體的乙個資料取樣率fraction。利用此取樣率對不均衡的partition呼叫sample運算元重新進行抽樣。
//計算資料取樣率
val fraction = math.min(samplesize / math.max(numitems, 1l), 1.0)//存放取樣key以及取樣權重
val candidates =arraybuffer.empty[(k, float)]//存放不均衡的partition
val imbalancedpartitions =mutable.set.empty[int]//(idx, n, sample)=> (partition id, 當前分割槽資料個數,當前partition的取樣資料)
sketched.foreach //在三倍之內的認為沒有發生資料傾斜
else//對於非均衡的partition,重新採用sample運算元進行抽樣
if(imbalancedpartitions.nonempty)
bounds.toarray
⑥ 計算每個key所在partition:當分割槽範圍長度在128以內,使用順序搜尋來確定key所在的partition,否則使用二分查詢演算法來確定key所在的partition。
* 獲得每個key所在的partitionid*/def getpartition(key: any): int=
}//範圍大於128,則進行二分搜尋該key所在範圍,即可得到該key所在的partitionid
elseif (partition >rangebounds.length) if(ascending) else
具體案例:對list裡面的單詞進行wordcount,並且輸出按照每個單詞的長度分割槽輸出到不同檔案裡面
classmypartitioner(val num:int) extends partitioner )//這裡指定自定義分割槽,然後輸出
println(rdd2.collect().tobuffer)
sc.stop()
結果:因為這裡定義的是4個partition 所以最後產生4個檔案
其中part-00000 和 part-00001如下:
其中part-00002 和 part-00003如下:
其中part-00000中zhangsan的長度對4取模為0和這個檔案中其他較短的單詞一樣,所以在乙個分割槽, part-00003沒有內容,說明上面的單詞的長度對4取模結果沒有為3的
spark更改分割槽 如何管理Spark的分割槽
當我們使用spark載入資料來源並進行一些列轉換時,spark會將資料拆分為多個分割槽partition,並在分割槽上並行執行計算。所以理解spark是如何對資料進行分割槽的以及何時需要手動調整spark的分割槽,可以幫助我們提公升spark程式的執行效率。什麼是分割槽 關於什麼是分割槽,其實沒有什...
spark分割槽器
spark的分割槽器 只有涉及到 key value 型別的rdd才會用到分割槽器,因為分割槽是以key分割槽的 spark中分割槽器直接決定了rdd中分割槽的個數 rdd中每條資料經過shuffle過程屬於哪個分割槽和reduce的個數。a hashpartitioner 預設分割槽器 hash分...
Spark中RDD分割槽以及節點
spark中rdd分割槽 對於二元rdd使用時,例如在使用join 時 我們對資料集是如何分割槽的卻一無所知。預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那台機器上對所有鍵相同的記錄進行連線操作,會非常消耗效能,如果乙個資料集設...