通過scala集合以並行化方式建立乙個rdd
scala> val pairrdd = sc.parallelize(list(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairrdd 這個rdd有兩個區,乙個區中存放的是:
("cat",2),("cat",5),("mouse",4)
另乙個分割槽中存放的是:
("cat",12),("dog",12),("mouse",2)
然後,執行下面的語句
scala > pairrdd.aggregatebykey(100)(math.max(_ , _), _ + _ ).collect
結果:
res0: array[(string,int)] = array((dog,100),(cat,200),(mouse,200))
下面是以上語句執行的原理詳解:
aggregatebykey的意思是:按照key進行聚合
第一步:將每個分區內key相同資料放到一起
分割槽一
("cat",(2,5)),("mouse",4)
分割槽二("cat",12),("dog",12),("mouse",2)
第二步:區域性求最大值
對每個分割槽應用傳入的第乙個函式,math.max(_ , _),這個函式的功能是求每個分割槽中每個key的最大值
這個時候要特別注意,aggregatebyke(100)(math.max(_ , _),_+_)裡面的那個100,其實是個初始值
在分割槽一中求最大值的時候,100會被加到每個key的值中,這個時候每個分割槽就會變成下面的樣子
分割槽一
("cat",(2,5,100)),("mouse",(4,100))
然後求最大值後變成:
("cat",100), ("mouse",100)
分割槽二("cat",(12,100)),("dog",(12.100)),("mouse",(2,100))
求最大值後變成:
("cat",100),("dog",100),("mouse",100)
第三步:整體聚合
將上一步的結果進一步的合成,這個時候100不會再參與進來
最後結果就是:
(dog,100),(cat,200),(mouse,200)
spark中運算元詳解 aggregateByKey
通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...
Spark運算元詳解
目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...
spark常用運算元詳解
1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....