在分布式程式中,通訊的代價是很大的,因此控制資料分布以獲得最少的網路傳輸可以極大地提公升整體效能。和單節點的程式需要為記錄集合選擇合適的資料結構一樣,spark程式可以通過控制rdd分割槽方式來減少通訊開銷。而只有當資料集多次在諸如連線這種基於鍵的操作使用時,分割槽才會有幫助。
spark中所有的鍵值對rdd都可以進行分割槽。系統會根據乙個針對鍵的函式對元素進行分組,並確保同一組的鍵出現在同乙個節點上。比如,可以使用雜湊分割槽將乙個rdd分成100個分割槽,此時鍵的雜湊值對100取模的結果相同的記錄會被放在乙個節點上;也可以使用範圍分割槽法,將鍵在同乙個範圍區間的記錄都放在同乙個節點上。
可以通過rdd的partitioner屬性來獲取rdd的分割槽方式。乙個應用的記憶體中儲存著一張較大的使用者資訊表,即由(userid,userinfo)對組成的rdd,其中userinfo包含乙個該使用者所訂閱的主題的列表。同時還有乙個小檔案,檔案內存放著過去五分鐘內使用者的訪問日誌,即由(userid,linkinfo)對組成的表。現在需要對使用者訪問其未訂閱主題的頁面情況進行統計,可以使用spark的join()來實現這個組合操作,其中需要把userinfo和linkinfo的有序對根據userid進行分組。
//初始化**,從hdfs上的乙個hadoop sequencefile中讀取使用者資訊
//userdata中的元素會根據它們被讀取時的**,即hdfs塊所在的節點來分布
//spark此時無法獲知某個特定的userid對應的記錄位於哪個節點上
val sc =
newsparkcontext(.
..)val userdata = sc.
sequencefile
("hdfs://...").
persist()
//週期性呼叫函式來處理過去五分鐘產生的事件日誌
//假設這是乙個包含(userid,linkinfo)對的sequencefile
def processuserinfo
(filename:string)
.count()
println
("number of visits to non-subscribed topics:"
+offtopicvists)
}
這段**可以執行,但是不夠高效,因為每次呼叫processuserinfo()時都會呼叫join()方法,預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器上,然後在那台機器上對所有鍵相同的記錄進行連線操作。因為userdata錶比每五分鐘出現的訪問日誌表events表要大得多,所以要浪費時間做很多額外工作:在每次呼叫時都對userdata表進行雜湊值計算和跨節點資料混洗,雖然這些資料從來都不會變化。
要解決這一問題可以在程式開始時,對userdata表使用partitionby()轉化操作,將這張表轉化為雜湊分割槽表,可以通過向partitionby()傳遞乙個spark.hashpartitioner物件來實現該操作。
val sc =
newsparkcontext(.
..)val userdata = sc.
sequencefile
("hdfs://...").
partitionby
(new
hashpartitioner
(100))
.persist
()
由於在構建userdata時呼叫了partitionby(),spark就知道了該rdd是根據鍵的雜湊值來分割槽的,這樣在呼叫userdata.join(events)時,spark就只會對events進行資料混洗操作,將events中特定的userid的記錄傳送到userdata的對應分割槽所在的那台機器上。這樣,需要通過網路傳輸的資料就大大減少了,程式執行速度也可以提公升。
注意:(1)對partitionby()的結果須進行持久化並儲存為uesrdata。否則後續的rdd操作會對userdata的整個譜系重新求值,導致對rdd一遍又一遍地進行雜湊分割槽和跨節點的混洗,和沒有指定分割槽方式時發生的情況十分相似。
(2)傳給partitionby()的100表示分割槽數目,它會控制之後對這個rdd進一步操作(如連線操作)時有多少任務會並行執行,這個值至少應該和集群中的總核心數一樣。
(3)除join()外還有其他很多操作也會利用到已有的分割槽資訊,比如sortbykey()和groupbykey()會分別生成範圍分割槽的rdd和雜湊分割槽的rdd。
spark的許多操作都引入了將資料根據鍵跨節點進行混洗的過程,所有這些操作都會從資料分割槽中獲益。能夠從資料分割槽中獲益的操作有:
cogroup()、groupwith()、join()、leftouterjoin()、rightouterjoin()、groupbykey()、reducebykey()、combinebykey()、lookup()。
對於join()這樣的二元操作,預先進行資料分割槽會導致其中至少乙個rdd(使用已知分割槽器的那個rdd)不發生資料混洗。
spark內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果rdd自動設定為對應的分割槽器。比如,使用join()連線兩個rdd時,由於鍵相同的元素會被雜湊到同一臺機器上,則輸出結果也是雜湊分割槽的。
不過,轉化操作的結果並不一定會按已知的分割槽方式分割槽,比如map()方法理論上可以改變元素的鍵,因此結果就不會有固定的分割槽方式。
spark會為生成的結果rdd設好分割槽方式的操作有:
cogroup()、groupwith()、join()、leftouterjoin()、rightouterjoin()、groupbykey()、reducebykey()、combinebykey()、partitionby()、sort()。
如果父rdd有分割槽方式的話,以下操作也會設有分割槽方式:
mapvalues()、flatmapvalues()、filter()。
其他操作生成的結果都不會存在特定的分割槽方式。所以為了最大化分割槽相關優化的潛在作用,應該在無需改變元素的鍵時盡量使用mapvalues()或flatmapvalues()。
對於二元操作,輸出資料的分割槽方式取決於父rdd的分割槽方式,預設情況下,結果會採用雜湊分割槽,分割槽的數量和操作的並行度一樣。如果其中乙個父rdd已經設定過分區方式,那麼結果就會採用那種分割槽方式,如果兩個父rdd都設定過分區方式,結果rdd會採用第乙個父rdd的分割槽方式。
class
defpartitioner
(partnum:int)
extends
partitioner
else
} override def equals
(other:any)
:boolean = other match
}
在程式中使用自定義分割槽函式,傳給partitionby()即可,比如,rdd.partitionby(new defpartitioner(5))。
spark中有很多依賴於資料混洗的方法,比如join()、groupbykey(),它們都可以接收乙個可選的partitioner物件來控制輸出資料的分割槽方式。
SQL高階高階
select top 50 percent from websites mysql 語法 oracle 語法 select column name s from table name limit number sql like 操作符 like 操作符用於在 where 子句中搜尋列中的指定模式。s...
MySQL高階高階
1 mysql的開窗函式 row number 從 1 開始,按照順序生成組內的記錄編號 rank 從 1 開始,按照順序,相同會重複,名次會留下空的位置,生成組內的記錄編號 dense rank 從 1 開始,按照順序生成組內的記錄編號,相同會重複,名次不會留下空的位置 開窗函式區別如下圖所示 f...
C語言高階 指標的高階 3
目錄 實踐之中不免會碰到陣列和指標作函式引數而如何設計形參的問題。一維陣列傳參,下列接收方式是否可行呢?1.void test int arr 2.void test int arr 10 3.void test int arr int main test arr return 0 1.陣列傳引數組...