cogroup
,groupwith
,join
,leftouterjoin
,rightouterjoin
,groupbykey
,reducebykey
,combinebykey
,partitionby
,sort
,mapvalues
(如果父rdd存在partitioner),flatmapvalues
(如果父rdd存在partitioner), 和filter
(如果父rdd存在partitioner)。
其他的transform操作不會影響到輸出rdd的partitioner,一般來說是none,也就是沒有partitioner。
下面舉個例子進行說明:
scala> val pairs = sc.parallelize(list((1, 1), (2, 2), (3, 3)))
pairs: org.apache.spark.rdd.rdd[(int, int)] =
parallelcollectionrdd[4] at parallelize at :12
scala> val a = sc.parallelize(list(2,51,2,7,3))
a: org.apache.spark.rdd.rdd[int] =
parallelcollectionrdd[5] at parallelize at :12
scala> val a = sc.parallelize(list(2,51,2))
a: org.apache.spark.rdd.rdd[int] =
parallelcollectionrdd[6] at parallelize at :12
scala> val b = sc.parallelize(list(3,1,4))
b: org.apache.spark.rdd.rdd[int] =
parallelcollectionrdd[7] at parallelize at :12
scala> val c = a.zip(b)
c: org.apache.spark.rdd.rdd[(int, int)] =
zippedpartitionsrdd2[8] at zip at :16
scala> val result = pairs.join(c)
result: org.apache.spark.rdd.rdd[(int, (int, int))] =
scala> result.partitioner
res6: option[org.apache.spark.partitioner] = some(org.apache.spark.hashpartitioner@2)
大家可以看到輸出來的rdd result分割槽變成了hashpartitioner
,因為join中的兩個分割槽都沒有設定分割槽,所以預設用到了hashpartitioner,可以看join的實現:
def join[w](other: rdd[(k, w)]): rdd[(k, (v, w))] =
def defaultpartitioner(rdd: rdd[_], others: rdd[_]*): partitioner =
if (rdd.context.conf.contains("spark.default.parallelism")) else
}
defaultpartitioner
函式就確定了結果rdd的分割槽。從上面的實現可以看到,
1、join
的兩個rdd如果都沒有partitioner
,那麼join
結果rdd將使用hashpartitioner
;
2、如果兩個rdd中其中有乙個有partitioner
,那麼join結果rdd將使用那個父rdd的partitioner
;
3、如果兩個rdd都有partitioner
,那麼join結果rdd就使用呼叫join的那個rdd的partitioner
。
spark 的RDD分割槽
rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...
Spark中RDD分割槽以及節點
spark中rdd分割槽 對於二元rdd使用時,例如在使用join 時 我們對資料集是如何分割槽的卻一無所知。預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那台機器上對所有鍵相同的記錄進行連線操作,會非常消耗效能,如果乙個資料集設...
Spark中RDD的分割槽數時如何的?
看目錄可能方便val rdd sc.parallelize list,6 分割槽數 指定分割槽數 val sc new sparkcontext new sparkconf set spark.default.parallelism 10 setmaster local 4 test 預設分割槽數 ...