Spark(二)運算元詳解

2022-02-23 13:59:04 字數 2971 閱讀 4643

目錄基於上次的wordcount,我們來寫乙個wordcountcount,來對wc程式進行第二次計數,我們來分析一下效能。

package com.littlepage.wc

import org.apache.spark.rdd.rdd

import org.apache.spark.

object wordcount )

val pl:rdd[(int,int)] = rev.reducebykey(_+_)

println("\nwordcountcount")

pl.foreach(println)

thread.sleep(100000000)

}}

通過效能圖,我們可以知道:

1.spark如果不對其結果進行儲存或輸出,那麼spark將不會處理map或者reduce操作

2.如果進行重複輸出,共用的map或者reduce操作只執行一次

3.預設如果產生一次shuffle是去檢視圖表的一次拐彎,為了儘量減少效能的消耗,編寫程式時應該儘量減少shuffle的次數

spark程式設計模型和mapreduce相比,spark可以多個job,多個state進行執行。

1.三個必備運算元

我們在寫乙個spark程式中,不可避免的運算元有三個,建立運算元,轉換運算元,收集運算元。

建立運算元可以建立乙個rdd資料集,這個建立可以在記憶體中(集合容器),也可以在硬碟中(檔案)獲取

轉換運算元可以處理乙個rdd資料集,即map和reduce操作,都算做轉換運算元。

收集運算元我們在寫乙個rdd資料集的時候,必須使用收集運算元進行收集,否則不會觸發shuffle。

示例,三個運算元寫乙個過濾數字程式。

package com.littlepage

import org.apache.spark.rdd.rdd

import org.apache.spark.

object demo2

}

package com.littlepage

import org.apache.spark.rdd.rdd

import org.apache.spark.

object demo2

}

2.常見運算元(交並差笛卡爾,cogroup,join)

2.1.union運算元

將兩個資料集合並為乙個資料集,直接合併,不會產生shuffle

object union 

}

2.2.intersection運算元

將2個資料集取交集,產生乙個shuffle

val interdata:rdd[int] = rdd1.intersection(rdd2)
2.3.substract運算元

將2個資料集取差集,產生乙個shuffle

val subdata:rdd[int] = rdd1.substract(rdd2)
2.4.cartesian運算元

將2個資料集取笛卡爾積,不產生shuffle

val cartesiandata:rdd[int] = rdd1.cartesian(rdd2)
2.5.cogroup運算元

兩個分組進行,key作為結果的key,value集合進行乙個二元祖,包含兩個分割槽的元素,產生乙個shuffle。

val rdd1:rdd[(string,int)] = sc.parallelize(list(

("zhangsan",11),

("zhangsan",12),

("lisi",13),

("wangwu",14)

));val rdd2:rdd[(string,int)] = sc.parallelize(list(

("zhangsan",21),

("zhangsan",22),

("lisi",23),

("zhaoliu",28)

))val cogroupdata:rdd[(string,(iterable[int],iterable[int]))] = rdd1.cogroup(rdd2)

6.join,leftouterjoin,rightouterjoin,fullouterjoin運算元

val joindata:rdd[(string,(int,int))] = rdd1.join(rdd2)

val leftdata:rdd[(string,(int,option[int]))] = rdd1.leftouterjoin(rdd2)

val rightdata:rdd[(string,(option[int],int))] = rdd2.rightouterjoin(rdd2)

val fulldata:rdd[(string,(option[int],option[int]))] = rdd1.fullouterjoin(rdd2)

3.排序和聚合計算

3.1.swap運算元

將乙個k-v資料集的key和value交換,用法

data.map(_.swap)
3.2.sort運算元

sort運算元可以將按照key進行全排序

data.sortbykey()
3.3.take運算元

獲得資料的前n個,n為乙個整型

data.take(n)
3.4.distinct去重

去除key相同的

val keys:rdd[(string,string) = map.distinct()

Spark運算元詳解

目錄 spark常用運算元詳解 3.getnumpartitions 4.partitions 5.foreachpartition 6.coalesce 7.repartition 8.union,zip,join 9.zipwithindex,zipwithuniqueid 未完待續.本文主要介...

spark常用運算元詳解

1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....

spark中運算元詳解 aggregateByKey

通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...