Spark執行各種操作的分割槽數解讀

2021-10-07 10:53:38 字數 2224 閱讀 2515

參考資料

//詳細記錄了不同操作下各個分割槽的個數

我們這邊分割槽數是按照什麼規則呢,今天詳細吧這個問題好好看下

分割槽的數量決定了spark任務的並行度

前提 我們的分割槽數都是按照預設規則,沒有人為改變過分區

我們不管是read.csv 還是 textfile 還是spark讀取hive的資料,根源還是相當於讀取hadoop上面的資料,所以他們的分割槽規則是一樣的。原則上其實都是走的hadooprdd相關方法。在測試和實驗中也都能驗證這些先關。(注意:在測試之前看看自己集群的block.size是多大,我們這裡是256m真的是把我給坑壞了。)

對於這塊的分割槽規則,我決定不上原始碼 這片文章有原始碼先關,這裡我只說原始碼中的幾個細節

大家可以比對著源**看我這塊 就是fileinputformat.getsplits的方法

textfile相關

通過textfile方式生成的rdd

如, val rdd = sc.textfile(「path/file」)

有兩種情況:

a、從本地檔案file:///生成的rdd,操作時如果沒有指定分割槽數,則預設分割槽數規則為:

rdd的分割槽數 = max(本地file的分片數, sc.defaultminpartitions)

b、從hdfs分布式檔案系統hdfs://生成的rdd,操作時如果沒有指定分割槽數,則預設分割槽數規則為:

rdd的分割槽數 = max(hdfs檔案的block數目, sc.defaultminpartitions)

1、long goalsize = totalsize / (numsplits == 0 ? 1 : numsplits); 中numsplits的相當於是defaultminpartitions: int = math.min(defaultparallelism, 2) 這裡的defaultparallelism值

由spark.default.parallelism 這個引數決定,在yarn上引數最小是2 也就決定了這個defaultminpartitions的值最小是2

(這裡也有出入,textfile自己也可以去指定)

2、 long minsize = math.max(job.getlong(org.apache.hadoop.mapreduce.lib.input.

fileinputformat.split_minsize, 1), minsplitsize);

這個值預設其實就是1

3、long splitsize = computesplitsize(goalsize, minsize, blocksize); 這一行**中點進去相當於

return math.max(minsize, math.min(goalsize, blocksize));

來確定splitsize 的值。所以這裡就一目了然了。

4、這兩行**相當於是將大檔案就行切分,剩餘的部分在重新初始化乙個陣列, 如果小檔案是直接初始化乙個陣列

while (((double) bytesremaining)/splitsize > split_slop) 

if (bytesremaining != 0)

1、如果乙個目錄中只有乙個小檔案,比如10m,那麼他最少還是得有2個分割槽(在yarn上執行的時候)

2、如果乙個目錄很大,然後裡面有小檔案,那麼小檔案是只占用乙個分割槽的 (這個和原始碼中的totalsize以及由他生成的goalsize有關係,因為他們決定了splitsize,影響了後續bytesremaining)/splitsize > split_slop這個值的大小)

3、如果乙個目錄的大檔案就是按照block進行分片的,比如230m,相當於2個分片(128m,102m)

最後說一嘴這一塊

執行sql這一塊,我發現執行完

session.sql(「select name,count(*) from ceshi.demo group by name」).write.csv("/user/zgh/kkk")

這個以後分割槽變成了200個,這個和spark的引數spark.sql.shuffle.partitions有關係。他的預設值是200,這裡需要注意的是這個引數只對shuffle後的資料起作用,比如sql在做join或者聚合的操作。

如果只是select * 之類不需要聚合的操作這個引數是不起作用的。如果我們還想再沒有shuffle的時候給他改變分割槽的話,我們可以使用repartition方法。

spark更改分割槽 Spark中的分割槽方法詳解

一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...

Spark中RDD的分割槽數時如何的?

看目錄可能方便val rdd sc.parallelize list,6 分割槽數 指定分割槽數 val sc new sparkcontext new sparkconf set spark.default.parallelism 10 setmaster local 4 test 預設分割槽數 ...

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...