先對區域性聚合,再對全域性聚合
示例:val rdd1 = sc.parallelize(list(1,2,3,4,5), 2)
檢視每個分割槽中的元素:
將每個分割槽中的最大值求和,注意:初始值是0;
如果初始值時候10,則結果為:30,因為在區域性操作和全域性操作的時候都要計算初始值
如果是求和,注意:初始值是0:
如果初始值是10,則結果是:45
乙個字串的例子:
val rdd2 = sc.parallelize(list("a","b","c","d","e","f"),2)
修改一下剛才的檢視分割槽元素的函式
def func2(index: int, iter: iterator[(string)]) : iterator[string] =
兩個分割槽中的元素:
[partid:0, val: a], [partid:0, val: b], [partid:0, val: c],
[partid:1, val: d], [partid:1, val: e], [partid:1, val: f]
執行結果:
更複雜一點的例子
val rdd3 = sc.parallelize(list("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).tostring, (x,y) => x + y)
兩個分割槽先計算出字串的最大長度,然後合成字串
結果可能是:」24」,也可能是:」42」,體現了並行化特點。
val rdd4 = sc.parallelize(list("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).tostring, (x,y) => x + y)
結果是:」10」,也可能是」01」,
原因:注意有個初始值」」,其長度0,然後0.tostring變成字串。值"0".tostring的長度為0,"0".tostring.length的長度為1 。分割槽可能為(「12」,「23」)和(「345」,「」);初始值為"",然後初始值和「12」,「34」比較,或者是""和「345」比較,然後和「」比較。
math.min("".length, "12".length ) 的結果是:0 , math.min("0".length, "23".length ) 的結果是1
math.min("".length, "345".length) 的結果是:0 , math.min("0".length, "".length) 的結果是:0
val rdd5 = sc.parallelize(list("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).tostring, (x,y) => x + y)
結果是:」11」,原因如下:
math.min("".length, "12".length ) 的結果是:0 , math.min("0".length, "23".length ) 的結果是:1
math.min("".length, "".length) 的結果是:0 , math.min("0".length, "345".length) 的結果是:1
注意:值"0".tostring的長度為0,"0".tostring.length的長度為1
Spark高階運算元練習(二)
package cn.allengao.exercise import org.apache.spark.object sparkrddtest3 執行結果 arraybuffer partid 0,val 1 partid 0,val 2 partid 0,val 3 partid 0,val 4...
spark學習 3 高階運算元
val rdd1 sc.parallelize list 1,2,3,4,5,6,7,8,9 2 rdd.aggregate 0 math.max seqop是作用於分割槽上的rdd,comop是通過操作seqop後再對其結果進行的操作 aggregatebykey 和aggregate運算元差不多...
spark運算元 五 action運算元
collect package com.doit.spark.demoday05 import org.apache.spark.sparkcontext author 向陽木 date 2020 09 22 22 19 description 將資料以陣列形式收集回driver端,資料按照分割槽編...