目錄
spark常用運算元詳解
3. getnumpartitions
4. partitions
5. foreachpartition
6. coalesce
7. repartition
8. union,zip,join
9. zipwithindex,zipwithuniqueid
未完待續...
本文主要介紹工程中常用的spark運算元以及其用法。
//功能:遍歷partition
param ://引數是乙個函式
f :param : iterator[t] //函式引數是乙個集合,代表乙個partition的資料
return : iterator[t] //返回值也是乙個集合
示例**:
println("建立鏈結。。。")
x.foreach(x=>)
println("提交。。。")
x}).count();
示例**:
println("partitionid" + index);
iterator.foreach(println)
println("-------------")
iterator
}).count()
獲得partition的數量
示例**:
println(rdd.getnumpartitions)
獲取所有的partition,返回值是乙個partition型別的陣列
示例**:
rdd.partitions.foreach(println)
示例**:
rdd.foreachpartition(x=>)
def coalesce(numpartitions: scala.int, shuffle: scala.boolean = )(implicit ord: scala.ordering[t] = ): org.apache.spark.rdd.rdd[t] =
重新分割槽方法,引數numpartitions是分割槽數;shuffle是否產生shuffle,true代表產生shuffle,false則不產生shuffle。
返回值:重新分割槽後的rdd。需要注意的是:
如果分割槽數增多了而shuffle設定為false,那麼分割槽數將不會改變
分割槽數減少則不會產生shuffle,即便設定了shuffle為true。他會在邏輯上將資料合併
示例**:
rdd.coalesce(2,true)
//如果分割槽數增多了而shuffle設定為false,那麼分割槽數將不會改變
//分割槽數減少則不會產生shuffle,即便設定了shuffle為true。他會在邏輯上將資料合併
def repartition(numpartitions: scala.int)(implicit ord: scala.ordering[t] = ): org.apache.spark.rdd.rdd[t] =
重新分割槽方法,引數是分割槽數,返回值是重新分割槽後的rdd
這個方法會產生shuffle,底層呼叫的是coalesce(numpartitions,true)
【示例**】:
rdd.repartition(3)
union(other:rdd[t])是資料合併,返回乙個新的資料集,由原資料集和otherdataset聯合而成。
示例**:
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(4 to 6)
rdd1.union(rdd2).foreach(println)
res: 12
3456
相較於union,zip(other:rdd[t])則是橫向合併,針對相同位置進行合併,如果左右元素數量不同,報錯!如果左右元素數量相同,分割槽數不一致,也會報錯!
示例**:
val rdd = sc.makerdd(1 to 3, 2)
val rdd2 = sc.makerdd(4 to 6, 2)
rdd.zip(rdd2).zip(rdd2).foreach(println)
res:
((1,4),4)
((2,5),5)
((3,6),6)
相同的,橫向合併還有join運算元,它是針對相同key進行合併。將輸入資料集(k,v)和另外乙個資料集(k,w)進行join,得到(k, (v,w));該操作是對於相同k的v和w集合進行笛卡爾積操作;
zipwithindex是將內容和索引作為二元組返回,不過他是將索引作為value,內容為key返回的。
示例**:
val rdd = sc.makerdd(1 to 3, 2)
rdd.zipwithindex().foreach(println)
res:
(1,0)
(2,1)
(3,2)
可以使用swap將kv翻**
rdd.zipwithindex()
.map(_.swap)
.foreach(println)
res:
(0,1)
(1,2)
(2,3)
zipwithuniquei spark常用運算元詳解
1.map 接收乙個函式,對於rdd中的每乙個元素執行此函式操作,結果作為返回值。eg val rdd sc.parallelize array 1,2,3,4 1 rdd.map x x x foreach println x x x 將元素x做平方處理,scala語句 sparkcontext....
Spark(二)運算元詳解
目錄基於上次的wordcount,我們來寫乙個wordcountcount,來對wc程式進行第二次計數,我們來分析一下效能。package com.littlepage.wc import org.apache.spark.rdd.rdd import org.apache.spark.object...
spark中運算元詳解 aggregateByKey
通過scala集合以並行化方式建立乙個rdd scala val pairrdd sc.parallelize list cat 2 cat 5 mouse 4 cat 12 dog 12 mouse 2 2 pairrdd 這個rdd有兩個區,乙個區中存放的是 cat 2 cat 5 mouse ...