Spark高階運算元練習(二)

2021-08-15 02:14:38 字數 3954 閱讀 8279

package cn.allengao.exercise

import org.apache.spark.

object sparkrddtest3

/*執行結果:arraybuffer([partid:0, val: 1], [partid:0, val: 2], [partid:0, val: 3], [partid:0, val: 4],

[partid:1, val: 5], [partid:1, val: 6], [partid:1, val: 7], [partid:1, val: 8], [partid:1, val: 9])

*/// println(res1.collect().tobuffer)

val rdd9 = rdd8.repartition(3)

//執行結果:3 ,分割槽數變為3。

// println(rdd9.partitions.length)

/*執行結果:arraybuffer([partid:0, val: 3], [partid:0, val: 7], [partid:1, val: 1], [partid:1, val: 4],

[partid:1, val: 5], [partid:1, val: 8], [partid:2, val: 2], [partid:2, val: 6], [partid:2, val: 9])

*/// println(res2.collect().tobuffer)

//coalesce,預設資料不進行shuffle,則分割槽數量不變,true表示進行shuffle操作,分割槽數量根據引數改變

val rdd10 = rdd8.coalesce(3, true)

//執行結果:3

// println(rdd10.partitions.length)

/*執行結果:arraybuffer([partid:0, val: 3], [partid:0, val: 7], [partid:1, val: 1], [partid:1, val: 4],

[partid:1, val: 5], [partid:1, val: 8], [partid:2, val: 2], [partid:2, val: 6], [partid:2, val: 9])

*/// println(res3.collect().tobuffer)

//collectasmap : map(b -> 2, a -> 1)

val rdd11 = sc.parallelize(list(("a", 1), ("b", 2)))

val res4 = rdd11.collectasmap

//執行結果:arraybuffer((a,1), (b,2))

// println(rdd11.collect().tobuffer)

//執行結果:map(b -> 2, a -> 1)

// println(res4)

//countbykey 計算key的數量

val rdd12 = sc.parallelize(list(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1)))

val res5 = rdd12.countbykey

//countbyvalue 計算(key,value)的數量

val res6 = rdd12.countbyvalue

//map(a -> 1, b -> 2, c -> 2)

// println(res5)

//map((b,2) -> 2, (c,2) -> 1, (a,1) -> 1, (c,1) -> 1)

// println(res6)

//filterbyrange 範圍過濾

val rdd13 = sc.parallelize(list(("e", 5), ("c", 3), ("d", 4), ("c", 2), ("a", 1), ("b", 6)))

val res7 = rdd13.filterbyrange("b", "d").collect()

//執行結果:arraybuffer((c,3), (d,4), (c,2), (b,6))

// println(res7.tobuffer)

// flatmapvalues :

val rdd14 = sc.parallelize(list(("a", "1 2"), ("b", "3 4")))

val res8 = rdd14.flatmapvalues(_.split(" ")).collect()

//執行結果:arraybuffer((a,1), (a,2), (b,3), (b,4))

// println(res8.tobuffer)

// foldbykey

val rdd15 = sc.parallelize(list("dog", "wolf", "cat", "bear"), 2)

val rdd16 = rdd15.map(x => (x.length, x))

// _+_ 表示字串的拼接

val rdd17 = rdd16.foldbykey(" ")(_ + _)

//執行結果:arraybuffer((3,dog), (4,wolf), (3,cat), (4,bear))

// println(rdd16.collect().tobuffer)

//執行結果:arraybuffer((4, wolf bear), (3, dog cat))

// println(rdd17.collect().tobuffer)

// foreachpartition 不會生成乙個新的rdd

val rdd18 = sc.parallelize(list(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)

val res9 = rdd18.foreachpartition(x => println(x.reduce(_ + _)))

//執行結果: 6 15 24

// print(res9)

//keyby : 以傳入的引數做key

val rdd19 = sc.parallelize(list("dog", "salmon", "salmon", "rat", "elephant"), 3)

//以單詞的長度作為key

val res10 = rdd19.keyby(_.length).collect()

//以第乙個字母作為key

val res11 = rdd19.keyby(_ (0)).collect()

//執行結果:arraybuffer((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))

// println(res10.tobuffer)

//執行結果:arraybuffer((d,dog), (s,salmon), (s,salmon), (r,rat), (e,elephant))

// println(res11.tobuffer)

//keys values

val rdd20 = sc.parallelize(list("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val rdd21 = rdd20.map(x => (x.length, x))

val res12 = rdd21.keys.collect

val res13 = rdd21.values.collect

//執行結果:arraybuffer(3, 5, 4, 3, 7, 5)

println(res12.tobuffer)

//執行結果:arraybuffer(dog, tiger, lion, cat, panther, eagle)

println(res13.tobuffer)

}}

Spark常用運算元練習

package cn.allengao.exercise import org.apache.spark.class name package describe sparkrdd運算元練習 creat user allen gao creat date 2018 1 25 creat time 10...

spark學習 3 高階運算元

val rdd1 sc.parallelize list 1,2,3,4,5,6,7,8,9 2 rdd.aggregate 0 math.max seqop是作用於分割槽上的rdd,comop是通過操作seqop後再對其結果進行的操作 aggregatebykey 和aggregate運算元差不多...

Spark(二)運算元詳解

目錄基於上次的wordcount,我們來寫乙個wordcountcount,來對wc程式進行第二次計數,我們來分析一下效能。package com.littlepage.wc import org.apache.spark.rdd.rdd import org.apache.spark.object...