spark中運算元詳解 aggregateByKey

2021-08-10 09:54:14 字數 1425 閱讀 5783

通過scala集合以並行化方式建立乙個rdd

scala> val pairrdd = sc.parallelize(list(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)

pairrdd 這個rdd有兩個區,乙個區中存放的是:

("cat",2),("cat",5),("mouse",4)

另乙個分割槽中存放的是:

("cat",12),("dog",12),("mouse",2)

然後,執行下面的語句

scala > pairrdd.aggregatebykey(100)(math.max(_ , _),  _ + _ ).collect

結果:

res0: array[(string,int)] = array((dog,100),(cat,200),(mouse,200))

下面是以上語句執行的原理詳解:

aggregatebykey的意思是:按照key進行聚合

第一步:將每個分區內key相同資料放到一起

分割槽一

("cat",(2,5)),("mouse",4)

分割槽二("cat",12),("dog",12),("mouse",2)

第二步:區域性求最大值

對每個分割槽應用傳入的第乙個函式,math.max(_ , _),這個函式的功能是求每個分割槽中每個key的最大值

這個時候要特別注意,aggregatebyke(100)(math.max(_ , _),_+_)裡面的那個100,其實是個初始值

在分割槽一中求最大值的時候,100會被加到每個key的值中,這個時候每個分割槽就會變成下面的樣子

分割槽一

("cat",(2,5,100)),("mouse",(4,100))

然後求最大值後變成:

("cat",100), ("mouse",100)

分割槽二("cat",(12,100)),("dog",(12.100)),("mouse",(2,100))

求最大值後變成:

("cat",100),("dog",100),("mouse",100)

第三步:整體聚合

將上一步的結果進一步的合成,這個時候100不會再參與進來

最後結果就是:

(dog,100),(cat,200),(mouse,200)

spark中運算元詳解 aggregateByKey

通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...

Spark運算元詳解

目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...

spark常用運算元詳解

1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....