package test
import org.apache.spark.rdd.rdd
import org.apache.spark.
import scala.util.random
object testforyiele
//建立乙個rdd,型別是int
val listrdd: rdd[
int]
= context.makerdd(res)
//map運算元,例如在listrdd集合中每個元素的字尾加入字串abc
val maprdd: rdd[
string
]= listrdd.map(_ +
"abc"
) maprdd.collect(
).foreach(datas => print(datas +
"\t"))
}}
val listrdd: rdd[array[
any]
]= context.makerdd(array(array(
"a",1,
2,45)
,array(4,
7,18,
"oop"
,"om"))
)//注意型別 rdd[any]一維與rdd[array[any]]二維達到了降維效果,這運算元也叫扁平化操作
val flatmaprdd: rdd[
any]
= listrdd.flatmap(x=>x)
val listrdd: rdd[
any]
= context.makerdd(list(
"sbv"
,"om"
,"shuffle",3
,4))
// f: iterator[t] => iterator[u]
// 遍歷每乙個分割槽(將乙個乙個分割槽看作乙個整體進行邏輯處理)
// 效率比map運算元要快,隱患是可能會記憶體溢位
val partitionsrdd: rdd[
string
]"s"
))
//生成6個[0,9]之間的整數
var res =
for(i <-
1 to 6
)yield
val listrdd: rdd[
any]
= context.makerdd(res,2)
//f: (int, iterator[t]) => iterator[u],獲取資料在哪乙個分割槽,
//二元組,引數一是分割槽號(int型別),引數二是資料(iterator[int]型別,根據listrdd的rdd型別推斷
val partitionswithindexrdd: rdd[
(any
,string)]
case
(nums, datas)
=>
datas.map(
(_,"分割槽:"
+ nums)
)}
輸出:
(7,分割槽:0)
(9,分割槽:0)
(1,分割槽:0)
(0,分割槽:1)
(5,分割槽:1)
(4,分割槽:1)
4)glom
glom函式將每個分割槽形成乙個陣列,內部實現是返回的glommedrdd。
//生成20個[0,9]之間的整數
var res =
for(i <-
1 to 20
)yield
val listrdd: rdd[
int]
= context.makerdd(res)
//將乙個分割槽的資料放入到array陣列中
val glomrdd: rdd[array[
int]
]= listrdd.glom(
)glomrdd.collect(
).foreach(datas=>println(datas.mkstring(
",")
))
輸出:
8,31,2,8
1,00,2,1
5,09,4,3
2,76,5,2
7) groupby
groupby :將元素通過函式生成相應的 key,資料就轉化為 key-value 格式,之後將 key 相同的元素分為一組。
//隨機生成30個[0-99]之間的整數並以vector集合型別進行返回給res
val res =
for(i <-
1 to 30
)yield
val listrdd: rdd[
int]
= context.makerdd(res)
//按照指定的規則進行分組,將listrdd集合中每乙個元素模以5,結果相同則視為乙個key
//返回乙個二元組 int 型別指的是返回模以5的結果,iterable[int])指的是返回相同key(模以5的結果)的集合
val grouprdd: rdd[
(int
, iterable[
int])]
= listrdd.groupby(x => x %5)
grouprdd.collect(
).foreach(println)
輸出:
(0,compactbuffer(15, 85, 70))
(1,compactbuffer(41, 6, 31, 41, 96, 46))
(2,compactbuffer(37, 7, 82, 37, 17, 72, 57))
(3,compactbuffer(8, 98, 98, 78, 8))
(4,compactbuffer(54, 34, 4, 24, 54, 59, 44, 44, 94))
8) filter
filter 函式功能是對元素進行過濾,對每個 元 素 應 用 f 函 數, 返 回 值 為 true 的 元 素 在rdd 中保留,返回值為 false 的元素將被過濾掉。 內 部 實 現 相 當 於 生 成 filteredrdd(this,sc.clean(f))。
val listrdd: rdd[string] = context.makerdd(array(「tom」,「lisa」,「python」,「sparck」,「scala」))
//將listrdd集合中包含s字元取出,其他過濾掉
val filterrdd: rdd[string] = listrdd.filter(x => x.contains(「s」))
filterrdd.collect().foreach(println)
輸出:sparck
scala
9)distinct
distinct將rdd中的元素進行去重操作.
val listrdd: rdd[
any]
= context.makerdd(array(2,
1,3,
2,1,
"tom"
,"python"
,"python"))
val distinctrdd: rdd[
any]
= listrdd.distinct(
) distinctrdd.collect(
).foreach(println)
輸出:1
tom2
3python
結果貌似與我們想得不一樣,distinct操作有shuffle的讀寫操作,那就有資料重組(洗牌)操作,實際上我們有這個轉換操作時,它必須等待所有分割槽的資料全部讀完,才會進行這個轉換操作,不同於其他轉換運算元,map運算元不需要等待下乙個分割槽,可以這樣說map運算元有並行機制,但distinct運算元沒有並行機制(我們也必須等待所有分割槽的資料讀完,才能去重)
11) sample
sample 將 rdd 這個集合內的元素進行取樣,獲取所有元素的子集。使用者可以設定是否有放回的抽樣、百分比、隨機種子,進而決定取樣方式。內部實現是生成 sampledrdd(withreplacement, fraction, seed)。
val listrdd: rdd[
int]
= context.makerdd(
1 to 20
)//隨機抽樣,引數一,是否放回資料,true放回,false不放回
//fraction引數並不是總體的百分比量,但它的引數越大,抽樣數量越大,
//seed表示乙個種子(這點可以再次執行會和上一次執行結
//果相同,執行幾次也是一樣,偽隨機數)
val samplerdd: rdd[
int]
= listrdd.sample(
false
,0.5,5
)
sparkStreaming轉換運算元
map 集群 nc 埠 9000 可以修改 替換 字 flatmap 切分壓平 filter repartition union合併 local 當只有兩個的時候 只有乙個分割槽 另乙個處理資料集 count reduce join 和 cogroup用兩個佇列join 以上運算元都是無狀態的 各處...
RDD轉換運算元和行動運算元的區別
textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...
RDD運算元怎麼區分轉換運算元和行動運算元
textfile 既不是transformation 也不是 action 它是為生成rdd前做準備 運算元 指的就是rdd上的方法。spark中的運算元分為2類 1 轉換運算元 transformation 由rrd 呼叫方法 返回乙個新的rdd 一直存在drive中因為沒生成task 特點 生成...