**:
通過scala集合以並行化方式建立乙個rdd
scala> val
pairrdd = sc.parallelize(list(("cat"
,2),("cat"
,5),("mouse"
,4),("cat"
,12),("dog"
,12),("mouse"
,2)),2
)
pairrdd 這個rdd有兩個區,乙個區中存放的是:
("cat"
,2),("cat"
,5),("mouse"
,4)
另乙個分割槽中存放的是:
("cat"
,12),("dog"
,12),("mouse"
,2)
然後,執行下面的語句
scala > pairrdd.aggregatebykey
(100
)(math.max
(_ , _), _ + _ ).collect
結果:
res0: array
[(string
,int
)] = array
((dog,100
),(cat,200
),(mouse,200
))
下面是以上語句執行的原理詳解:
aggregatebykey的意思是:按照key進行聚合
第一步:將每個分區內key相同資料放到一起
第二步:區域性求最大值分割槽一
("cat"
,(2,5
)),("mouse",4)
分割槽二("cat"
,12),("dog"
,12),("mouse"
,2)
對每個分割槽應用傳入的第乙個函式,math.max(_ , _),這個函式的功能是求每個分割槽中每個key的最大值
這個時候要特別注意,aggregatebyke(100)(math.max(_ , _),_+_)裡面的那個100,其實是個初始值
在分割槽一中求最大值的時候,100會被加到每個key的值中,這個時候每個分割槽就會變成下面的樣子
分割槽一
("cat"
,(2,5
,100
)),("mouse"
,(4,100
))然後求最大值後變成:
("cat"
,100
), ("mouse"
,100
)分割槽二
("cat"
,(12
,100
)),("dog"
,(12.100
)),("mouse"
,(2,100
))求最大值後變成:
("cat"
,100
),("dog"
,100
),("mouse"
,100
)
第三步:整體聚合
將上一步的結果進一步的合成,這個時候100不會再參與進來
最後結果就是:
(dog,100
),(cat,200
),(mouse
,200
)
spark中運算元詳解 aggregateByKey
通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...
Spark運算元詳解
目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...
spark常用運算元詳解
1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....