Spark運算元詳解

2021-08-31 16:07:07 字數 3228 閱讀 9168

目錄

spark常用運算元詳解

3. getnumpartitions

4. partitions

5. foreachpartition

6. coalesce

7. repartition

8. union,zip,join

9. zipwithindex,zipwithuniqueid

未完待續...

本文主要介紹工程中常用的spark運算元以及其用法。

//功能:遍歷partition

param ://引數是乙個函式

f :param : iterator[t] //函式引數是乙個集合,代表乙個partition的資料

return : iterator[t] //返回值也是乙個集合

示例**

println("建立鏈結。。。")

x.foreach(x=>)

println("提交。。。")

x}).count();

示例**

println("partitionid" + index);

iterator.foreach(println)

println("-------------")

iterator

}).count()

獲得partition的數量

示例**

println(rdd.getnumpartitions)
獲取所有的partition,返回值是乙個partition型別的陣列

示例**

rdd.partitions.foreach(println)
示例**

rdd.foreachpartition(x=>)
def coalesce(numpartitions: scala.int, shuffle: scala.boolean = )(implicit ord: scala.ordering[t] = ): org.apache.spark.rdd.rdd[t] =
重新分割槽方法,引數numpartitions是分割槽數;shuffle是否產生shuffle,true代表產生shuffle,false則不產生shuffle。

返回值:重新分割槽後的rdd。需要注意的是

如果分割槽數增多了而shuffle設定為false,那麼分割槽數將不會改變

分割槽數減少則不會產生shuffle,即便設定了shuffle為true。他會在邏輯上將資料合併

示例**

rdd.coalesce(2,true)

//如果分割槽數增多了而shuffle設定為false,那麼分割槽數將不會改變

//分割槽數減少則不會產生shuffle,即便設定了shuffle為true。他會在邏輯上將資料合併

def repartition(numpartitions: scala.int)(implicit ord: scala.ordering[t] = ): org.apache.spark.rdd.rdd[t] =
重新分割槽方法,引數是分割槽數,返回值是重新分割槽後的rdd

這個方法會產生shuffle,底層呼叫的是coalesce(numpartitions,true)

示例**】:

rdd.repartition(3)
union(other:rdd[t])是資料合併,返回乙個新的資料集,由原資料集和otherdataset聯合而成。

示例**:

val rdd1 = sc.parallelize(1 to 3)

val rdd2 = sc.parallelize(4 to 6)

rdd1.union(rdd2).foreach(println)

res: 12

3456

相較於union,zip(other:rdd[t])則是橫向合併,針對相同位置進行合併,如果左右元素數量不同,報錯!如果左右元素數量相同,分割槽數不一致,也會報錯!

示例**:

val rdd = sc.makerdd(1 to 3, 2)

val rdd2 = sc.makerdd(4 to 6, 2)

rdd.zip(rdd2).zip(rdd2).foreach(println)

res:

((1,4),4)

((2,5),5)

((3,6),6)

相同的,橫向合併還有join運算元,它是針對相同key進行合併。將輸入資料集(k,v)和另外乙個資料集(k,w)進行join,得到(k, (v,w));該操作是對於相同k的v和w集合進行笛卡爾積操作;

zipwithindex是將內容和索引作為二元組返回,不過他是將索引作為value,內容為key返回的。

示例**:

val rdd = sc.makerdd(1 to 3, 2)

rdd.zipwithindex().foreach(println)

res:

(1,0)

(2,1)

(3,2)

可以使用swap將kv翻**

rdd.zipwithindex()

.map(_.swap)

.foreach(println)

res:

(0,1)

(1,2)

(2,3)

zipwithuniquei

spark常用運算元詳解

1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....

Spark(二)運算元詳解

目錄基於上次的wordcount,我們來寫乙個wordcountcount,來對wc程式進行第二次計數,我們來分析一下效能。package com.littlepage.wc import org.apache.spark.rdd.rdd import org.apache.spark.object...

spark中運算元詳解 aggregateByKey

通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...