combinebykey(createcombiner,mergevalue,mergecombiners,partitioner,maosidecombine)
createcombiner:在第一次遇到key時建立組合器函式,將rdd資料集中的v型別轉換成c型別(v=>c);
mergevalue:合併值函式,同時在遇到相同的key時,createcombiner的c型別與這次傳入的v型別值合併成乙個c型別值(c,v)=>c
mergecombiner:合併組合器函式,將c型別值兩兩合併成乙個c型別的值
partitioner:使用已有的或者自定義的分割槽函式,預設hashpartition
mapsidecombine:是否在map端進行combine操作,預設為true
一些銷售資料,以《公司,當月收入》的形式儲存,通過combinebykey的操作求出每個公司的總收入和月平均收入
val data=sc.parallelize(array((
"company_1",20),(
"company_2",16),(
"company_4",28),(
"company_3",45),(
"company_2",20),(
"company_1",28),(
"company_3",20),(
"company_4",20))
)val res=data.combinebykey((income)
=>
(income,1)
,(acc:
(int,int)
,income)
=>
(acc._1+income,acc._2+1)
,(acc1:
(int,int)
,acc2:
(int,int))
=>
(acc1._1+acc2._1,acc1._2+acc2._2)).map(
)
執行data.combinebykey時,首先,取出data中的第乙個rdd元素,(「company_1」,20)
key是company_1,這個key第一次遇到,因此spark會為這個key建立乙個組合器函式 createcombiner,負責把value從v型別轉化成c型別,這裡createcombiner的值是一 個匿名函式,即(income)=>(income,1),系統會把company_1這個key對應的value賦值 給income,也就是income=20.然後將income轉換成乙個元組(income,1)。然後是第二個rdd元素(「company_2」,16),也是為這個新的key建立乙個createcombiner.當遇到相同key時,系統會根據mergevalue所提供的合併值函式,將createcombiner的c型別值和這次傳入的v型別值合併成乙個新的c型別值。即(acc:(int,int),income)=>(acc._1+income,acc._2+1),例如當(「company_1」,28)被傳入時,系統會把28這個值賦值給income,把以前得到的(20,1)這個型別賦值給acc。acc._1+income就是20+28,acc._2+1就是將原來(20,1)中的1再加上1.從而得到(48,2)。
由於rdd被分成了多個分割槽,實際應用中分割槽可能再不同的機器上,因此需要mergecombiners對不同機器上的結果進行彙總。這裡(acc1:(int,int),acc2:(int,int))=>(acc1._1+
acc2._1,acc1._2+acc2._2)),是將兩個c型別進行合併,得到乙個新的c型別。map()是指求出每個公司的總收入和平均收入。
輸入給map的每個rdd類似於(「company_1」,(48,2))這種形式
實驗結果如下:
給定一組鍵值對(「spark」,2),(「hadoop」,6),(「hadoop」,4),(「spark」,6),key表示圖書名,value表示平均銷量,求出每種圖書每天平均銷量
執行語句如下:
val rdd=sc.parallelize(array((
"spark",2),(
"spark",2),(
"hadoop",2),(
"hadoop",6),(
"hadoop",4),(
"spark",6))
)rdd.mapvalues(x=
>
(x,1)).reducebykey((x,y)
=>
(x._1+y._1,x._2+y._2))
.mapvalues(x=
>
(x._1/x._2)).collect(
)
解析:通過mapvalues(x=>(x,1))會把rdd中每乙個元素都取出來,並把該元素中的value轉化成乙個元組(x,1),例如(「spark」,2)中value值2轉化為(2,1),因而此時rdd儲存的資料變成了(「spark」,(2,1))。
reducebykey((x,y)=>(x._1+y._1,x._2+y._2))會把相同key值的value進行聚合計算。 mapvalues(x=>(x._1/x._2))對rdd中的每個元素的value執行變化。
spark學習 RDD程式設計
rdd建立 從從檔案系統中載入資料建立rdd 1.spark採用textfile 從檔案系統中載入資料建立rdd 可以使本地,分布式系統等 2.把檔案的url作為引數 可以是本地檔案系統的位址,分布式檔案系統hdfs的位址等等 從本地檔案中載入資料 sc為系統自動建立的sparkcontext,不用...
Spark學習 RDD程式設計基礎
spark上開發的應用程式都是由乙個driver programe構成,這個所謂的驅動程式在spark集群通過跑main函式來執行各種並行操作。集群上的所有節點進行平行計算需要共同訪問乙個分割槽元素的集合,這就是rdd rdd resilient distributed dataset 彈性分布式資...
Spark學習進度 RDD運算元
需求 資料格式如下 在 spark 中,其實最終 job3 從邏輯上的計算過程是 job3 job1.map filter,整個過程是共享記憶體的,而不需要將中間結果存放在可靠的分布式檔案系統中 線性回歸 val points sc.textfile map persist val w random...