spark學習 3 高階運算元

2021-10-14 08:52:04 字數 1489 閱讀 8626

val rdd1=sc.parallelize(list(1,2,3,4,5,6,7,8,9),2)

rdd.aggregate(0)(math.max(_,_),_+_)

seqop是作用於分割槽上的rdd,comop是通過操作seqop後再對其結果進行的操作

aggregatebykey:和aggregate運算元差不多,只不過操作的是的資料型別,seqop是對分割槽中同樣的key做的操作的結果再用comop進行操作。

示例:1、準備帶有(key,value)的分割槽資料

val pairrdd=sc.parallelize(list(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
2、檢視一下分割槽後的資料是什麼樣的。

定義乙個分割槽操作函式

def func3(index:int,iter:iterator[(string,int)]):iterator[string]=
資料顯示為:

array[string] = array(

partid:0 ,value:(cat,2), partid:0 ,value:(cat,5), partid:0 ,value:(mouse,4),

partid:1 ,value:(cat,12), partid:1 ,value:(dog,12), partid:1 ,value:(mouse,2)

)

3、使用aggregatebykey函式進行操作

(1)將每個動物園中動物數最多個數進行求和

pairrdd.aggregatebykey(0)(math.max(_,_),_+_)
結果是:array[(string, int)] = array((dog,12), (cat,17), (mouse,6))

(2)求每種動物的和

pairrdd.aggregatebykey(0)(_+_,_+_).collect
結果是:res28: array[(string, int)] = array((dog,12), (cat,19), (mouse,6))

總結:我認為其實在做aggregatebykey的時候,其實做了一次區域性reducebykey,

形成了分割槽0聚合後的資料為:(「cat」,(2,5)),(「mouse」,(4))

分割槽1聚合後的資料為:(「cat",(12)),(「dog」,(12)),(「mouse」,(2))

aggregatebykey初始化值0,其實往value中新增的,比如分割槽0聚合後的資料為

(「cat」,(2,5,0)),(「mouse」,(4,0)),分割槽1聚合後的資料為(「cat",(12,0)),(「dog」,(12,0)),(「mouse」,(2,0)),隨後seqop中操作的和comop操作的資料其實均為value

Spark高階運算元練習(二)

package cn.allengao.exercise import org.apache.spark.object sparkrddtest3 執行結果 arraybuffer partid 0,val 1 partid 0,val 2 partid 0,val 3 partid 0,val 4...

Spark高階運算元aggregate所遇到的坑

先對區域性聚合,再對全域性聚合 示例 val rdd1 sc.parallelize list 1,2,3,4,5 2 檢視每個分割槽中的元素 將每個分割槽中的最大值求和,注意 初始值是0 如果初始值時候10,則結果為 30,因為在區域性操作和全域性操作的時候都要計算初始值 如果是求和,注意 初始值...

Spark學習進度 RDD運算元

需求 資料格式如下 在 spark 中,其實最終 job3 從邏輯上的計算過程是 job3 job1.map filter,整個過程是共享記憶體的,而不需要將中間結果存放在可靠的分布式檔案系統中 線性回歸 val points sc.textfile map persist val w random...