1.map()
接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。
eg: val rdd = sc.parallelize
(array(1, 2, 3, 4), 1)
rdd.map(x => x*x).foreach(println) //x => x*x:將元素x做平方處理,scala語句
sparkcontext.parallelize()---建立rdd,並分割槽
map(x => x*x):對所有的元素進行括號內的處理
2.filter()
接收乙個函式,對於rdd中的每乙個元素執行此函式操作,留下結果為true的項。
eg: val rdd = sc.parallelize
(array(1, 2, 3, 4), 1)
rdd.filter(_>2).foreach(println) //_代表任意元素,該函式表示判定元素是否大於2
3.flatmap()
接收乙個函式,對於rdd中的每乙個元素執行此函式操作,和map差不多,只是對於每個輸入項返回結果有多個,類似一對多對映。
eg: sc.textfile(「test.txt」)
.flatmap(_.split(」 「)) // 一對多對映,每行處理後返回多個單詞
.map((_, 1)) // 每個單詞對映成pairrdd
.reducebykey(+)
.collect()
.foreach(println)
flatmap(_.split(」 「)) //_表示任意元素,也就是把每一行當做乙個元素,然後進行分割,返回多個單詞
4.union()
合併兩個rdd。
5.intersection
求兩個rdd的交集
6.subtract
求兩個rdd的差集
7.cartesian
求兩個rdd的笛卡兒積
action運算元:
8.reduce
接收兩個同型別的元素,返回乙個同樣型別的元素 val rdd = sc.parallelize(array(1, 2, 3, 4), 1)
rdd.reduce(+) // 求和
9、fold
和reduce類似,有初始值 rdd.fold(0)((x, y) => x+y) // 求和
10、aggregate
和fold一樣也有初始值,但型別可以和rdd的型別不同,利用aggregate來去重
val rdd = sc.parallelize(array(1, 2, 2, 3, 4, 4, 4, 5), 1)
rdd.aggregate(collection.mutable.setint)((set, x) => set+=x, (set1, set2) => set1++set2)
.foreach(println)
這裡初始值是乙個空集合set,首先在本地進行聚合,也就是將元素放進set去重。
之後在不同的結點上的進行聚合,也就是set和set進行合併。
先在本地聚合有助於減少shuffle的量,減少下乙個stage進行計算量。
reducebykey,combinebykey,aggregatebykey都有這種效果。
11、cache和persist
持久化
12、repartition和coalesce
repartition是coalesce中shuffle引數為true時的實現
需要稍微減少分割槽可以用coalesce這樣就可以減少shuffle
如果要增加分割槽或者分割槽減少到1或特別少的時候,可以使用repartition或coalesce(1, true)
Spark運算元詳解
目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...
Spark 常用運算元
官網rdd操作指南 2 key value資料型別的transfromation運算元 三 連線 3 action運算元 val list list 1 2,3 sc.parallelize list map 10 foreach println 輸出結果 10 20 30 這裡為了節省篇幅去掉了換...
Spark(二)運算元詳解
目錄基於上次的wordcount,我們來寫乙個wordcountcount,來對wc程式進行第二次計數,我們來分析一下效能。package com.littlepage.wc import org.apache.spark.rdd.rdd import org.apache.spark.object...