scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))
rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[60] at makerdd at :24
scala> rdd.collect
res50: array[(string, int)] = array((a,1), (a,2), (a,3), (b,4), (b,5), (c,6), (c,7), (c,8), (c,9), (d,10))
scala> rdd.count()
res46: long = 10
scala> rdd.first()
res45: (string, int) = (a,1)
scala> rdd.reduce((x, y) => (x._1 + y._1, x._2 + y._2))
res49: (string, int) = (aaccabbccd,55)
scala> rdd.take(2)
res51: array[(string, int)] = array((a,1), (a,2))
scala> rdd.top(1)
res54: array[(string, int)] = array((d,10))
scala> rdd.takeordered(1)
res56: array[(string, int)] = array((a,1))
scala> rdd.takeordered(2)
res57: array[(string, int)] = array((a,1), (a,2))
聚合rdd中的元素,先使用seqop將rdd中每個分割槽中的t型別元素聚合成u型別,再使用combop將之前每個分割槽聚合後的u型別聚合成u型別,需要注意的是seqop和combop都會使用到zerovalue的值
// 定義rdd,設定第乙個分割槽中包含1,2,3,4,5,第二個分割槽中包含6,7,8,9,10
scala> var rdd = sc.makerdd(1 to 10, 2)
rdd: org.apache.spark.rdd.rdd[int] = parallelcollectionrdd[65] at makerdd at :24
| (partidx, iter) =>
| else
| }
| }
| part_map.iterator
| }
| }.collect
res59: array[(string, list[int])] = array((part_0,list(5, 4, 3, 2, 1)), (part_1,list(10, 9, 8, 7, 6)))
// aggregate的最後結果是58,原因是先在每個分割槽中迭代執行(x: int, y: int) => x + y,並且使用zerovalue的值1,
// 即part_0中計算過程為 1+1+2+3+4+5=16,part_1中計算過程為1+6+7+8+9+10=41
// 再將兩個分割槽中的結果執行(a: int, b: int) => a + b,並應用zerovalue的值,結果為1+16+41=58
scala> rdd.aggregate(1)(
| ,
|
| )res61: int = 58
fold操作與aggregate操作功能類似,區別在於seqop和combop是統一個函式
scala> rdd.fold(1)(
| (x, y) => x + y
| )res63: int = 58
該操作應用於(k, v)形式的rdd,返回指定k所對應的所以v值
scala> rdd.fold(1)(
| (x, y) => x + y
| )res63: int = 58
統計rdd[k, v]中每個k的數量
scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))
rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24
scala> rdd.countbykey()
res65: scala.collection.map[string,long] = map(d -> 1, a -> 3, b -> 2, c -> 4)
foreach遍歷rdd中的每個元素,並應用函式f。foreachpartition與foreach型別,區別在於前者對針對每個分割槽。
scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))
rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24
scala> rdd.foreach(println)
(a,1)
(a,3)
(c,8)
(c,6)
(c,9)
(b,4)
(a,2)
(b,5)
(d,10)
(c,7)
根據指定的排序函式f對k進行排序
scala> var rdd = sc.makerdd(array(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6), ("c", 7), ("c", 8), ("c", 9), ("d", 10)))
rdd: org.apache.spark.rdd.rdd[(string, int)] = parallelcollectionrdd[67] at makerdd at :24
scala> rdd.sortby(x => x).collect
res68: array[(string, int)] = array((a,1), (a,2), (a,3), (b,4), (b,5), (c,6), (c,7), (c,8), (c,9), (d,10))
scala> rdd.sortby(x => x, false).collect
res70: array[(string, int)] = array((d,10), (c,9), (c,8), (c,7), (c,6), (b,5), (b,4), (a,3), (a,2), (a,1))
參考:
[1] 郭景瞻. **spark:核心技術與案例實戰[m]. 北京:電子工業出版社, 2017.
Spark操作 轉換操作 一
基礎轉換操作 鍵值轉換操作 對rdd中的每個元素都應用乙個指定的函式,以此產生乙個新的rdd scala var rdd sc.textfile users lyf desktop test data1.txt scala rdd.map line line.split collect res16 ...
RDD行動操作
行動操作是第二種型別的rdd操作,它們會把最終求得的結果返回到驅動器程式中,或者寫入外部儲存系統中。常見的rdd行動操作 1.reduce 它接收乙個函式作為引數,這個函式要操作兩個相同的元素型別的rdd資料並返回乙個同樣型別的新元素。乙個簡單的例子就是函式 可以用它來對我們的rdd進行累加。使用r...
Spark操作 控制操作
cache和persist操作都是對rdd進行持久化,其中cache是persist採用memory only儲存級別時的乙個特例,scala var rdd sc.textfile users lyf desktop data.txt scala rdd.cache 第一次計算行數,這裡只能從本地...