spark action常用運算元型別如下:
1.collectasmap():map[k, v]:二元組rdd轉為map資料型別
countbykey(): map[k, long]:統計rdd中每個key出現的次數,還回map型別表示每個key出現了幾次
countbyvalue(): map[t, long]:統計rdd中每個元素出現的次數,還回map型別表示每個元素出現了幾次
val rdd2: rdd[(string, int)] = sc.parallelize(list(("a", 21), ("b", 2), ("c", 3), ("a", 3), ("d", 21), ("e", 21)), 2)
// action: collectasmap
val kv: map[string, int] = rdd2.collectasmap()
println(kv)
// countbykey
val keycount = rdd2.countbykey()
println(keycount)
// countbyvalue
val valuecount = rdd2.countbyvalue()
println(valuecount)
/**map(e -> 21, b -> 2, d -> 21, a -> 3, c -> 3)
map(e -> 1, a -> 2, b -> 1, c -> 1, d -> 1)
map((c,3) -> 1, (b,2) -> 1, (e,21) -> 1, (a,3) -> 1, (a,21) -> 1, (d,21) -> 1)
*/
2.foreach(f: t => unit): unit:迴圈遍歷rdd中每乙個元素
foreachpartition(f: iterator[t] => unit): unit:該運算元與foreach運算元類似,遍歷處理元素資料時可共享分區內資源,當需要額外物件資料時foreachpartition運算元比foreach效率高。
val rdd = sc.parallelize(list(1, 2, 3, 4, 5, 6), 2)
rdd.foreach(println(_))
rdd.foreachpartition
println(value+iterator.reduce(_*_))
}
3.aggregate[u: classtag](zerovalue: u)(seqop: (u, t) => u, combop: (u, u) => u): u:運算元中引數zerovalue是初始值;seqop是分區內task的執行邏輯,在分區內zerovalue與第乙個元素按業務邏輯聚集再依次與剩餘元素聚集,還回與zerovalue同樣資料型別;combop是聚集各分割槽的業務邏輯操作,起始也是由zerovalue與第乙個分割槽聚集結果按業務邏輯聚集,再依次與剩餘分割槽聚集結果依次聚集,輸入與輸出資料型別都與zerovalue一致。
/**
* 分區內初始值zerovalue與每個元素依次聚集
** @param zerovalue
* @param value
* @return
*/def seqop(zerovalue: arraybuffer[int], value: int): arraybuffer[int] =
/*** 各個分割槽聚集後的結果再依次聚集
** @param a
* @param b
* @return
*/def combop(a: arraybuffer[int], b: arraybuffer[int]): arraybuffer[int] =
val rdd = sc.parallelize(list(1, 2, 3, 4, 5, 6), 2)
val result = rdd.aggregate(arraybuffer[int](88))(seqop, combop)
println("------------------------------------")
println(result)
/**結果如下:
zerovalue:arraybuffer(88) value:1
zerovalue:arraybuffer(88, 1) value:2
zerovalue:arraybuffer(88, 1, 2) value:3
zerovalue:arraybuffer(88) value:4
zerovalue:arraybuffer(88, 4) value:5
zerovalue:arraybuffer(88, 4, 5) value:6
a:arraybuffer(88) b:arraybuffer(88, 1, 2, 3)
a:arraybuffer(88, 88, 1, 2, 3) b:arraybuffer(88, 4, 5, 6)
------------------------------------
arraybuffer(88, 88, 1, 2, 3, 88, 4, 5, 6)
說明:zerovalue先在每個分割槽與元素依次聚集,zerovalue再與各個分割槽結果依次聚集,故2個分割槽結果有3個88
*/
未完待續 spark運算元 五 action運算元
collect package com.doit.spark.demoday05 import org.apache.spark.sparkcontext author 向陽木 date 2020 09 22 22 19 description 將資料以陣列形式收集回driver端,資料按照分割槽編...
Spark 常用運算元
官網rdd操作指南 2 key value資料型別的transfromation運算元 三 連線 3 action運算元 val list list 1 2,3 sc.parallelize list map 10 foreach println 輸出結果 10 20 30 這裡為了節省篇幅去掉了換...
Spark常用運算元練習
package cn.allengao.exercise import org.apache.spark.class name package describe sparkrdd運算元練習 creat user allen gao creat date 2018 1 25 creat time 10...