spark更改分割槽 如何管理Spark的分割槽

2021-10-17 00:15:20 字數 2821 閱讀 8037

當我們使用spark載入資料來源並進行一些列轉換時,spark會將資料拆分為多個分割槽partition,並在分割槽上並行執行計算。所以理解spark是如何對資料進行分割槽的以及何時需要手動調整spark的分割槽,可以幫助我們提公升spark程式的執行效率。

什麼是分割槽

關於什麼是分割槽,其實沒有什麼神秘的。我們可以通過建立乙個dataframe來說明如何對資料進行分割槽:

scala> val x = (1 to 10).tolist

x: list[int] = list(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val numsdf = x.todf("num")

numsdf: org.apache.spark.sql.dataframe = [num: int]

建立好dataframe之後,我們再來看一下該datafame的分割槽,可以看出分割槽數為4:

scala> numsdf.rdd.partitions.size

res0: int = 4

當我們將dataframe寫入磁碟檔案時,再來觀察一下檔案的個數,

scala> numsdf.write.csv("file:///opt/modules/data/numsdf")

可以發現,上述的寫入操作會生成4個檔案

每個分割槽的資料如下:

part-00000: 1, 2

part-00001: 3, 4, 5

part-00002: 6, 7

part-00003: 8, 9, 10

coalesce操作

原始碼def coalesce(numpartitions: int): dataset[t] = withtypedplan 解釋

返回乙個按照指定分割槽列的新的dataset,具體的分割槽數量有引數spark.sql.shuffle.partitions預設指定,該預設值為200,該操作與hivesql的distribute by操作類似。

repartition除了可以指定具體的分割槽數之外,還可以指定具體的分割槽字段。我們可以使用下面的示例來**如何使用特定的列對dataframe進行重新分割槽。

首先建立dataframe:

val people = list(

("jack","male"),

("alice","female"),

("tom","male"),

("angela","female"),

("tony","male")

val peopledf = people.todf("name","gender")

讓我們按gender列對dataframe進行分割槽:

scala> val genderdf = peopledf.repartition($"gender")

genderdf: org.apache.spark.sql.dataset[org.apache.spark.sql.row] = [name: string, gender: string]

按列進行分割槽時,spark缺省會建立200個分割槽。此示例將有兩個帶有資料的分割槽,其他分割槽將沒有資料。

scala> genderdf.rdd.partitions.size

res23: int = 200

一些注意點

該如何設定分割槽數量

假設我們要對乙個大資料集進行操作,該資料集的分割槽數也比較大,那麼當我們進行一些操作之後,比如filter過濾操作、sample取樣操作,這些操作可能會使結果資料集的資料量大量減少。但是spark卻不會對其分割槽進行調整,由此會造成大量的分割槽沒有資料,並且向hdfs讀取和寫入大量的空檔案,效率會很低,這種情況就需要我們重新調整分數數量,以此來提公升效率。

通常情況下,結果集的資料量減少時,其對應的分割槽數也應當相應地減少。那麼該如何確定具體的分割槽數呢?分割槽過少:將無法充分利用群集中的所有可用的cpu core

分割槽過多:產生非常多的小任務,從而會產生過多的開銷

在這兩者之間,第乙個對效能的影響相對比較大。對於小於1000個分割槽數的情況而言,排程太多的小任務所產生的影響相對較小。但是,如果有成千上萬個分割槽,那麼spark會變得非常慢。

spark中的shuffle分割槽數是靜態的。它不會隨著不同的資料大小而變化。上文提到:預設情況下,控制shuffle分割槽數的引數spark.sql.shuffle.partitions值為200,這將導致以下問題對於較小的資料,200是乙個過大的選擇,由於排程開銷,通常會導致處理速度變慢。

對於大資料,200很小,無法有效使用群集中的所有資源

一般情況下,我們可以通過將集群中的cpu數量乘以2、3或4來確定分割槽的數量。如果要將資料寫出到檔案系統中,則可以選擇乙個分割槽大小,以建立合理大小的檔案。

該使用哪種方法進行重分割槽呢?

對於大型資料集,進行shuffle操作是很消耗效能的,但是當我們的資料集比較小的時候,可以使用repartition方法進行重分割槽,這樣可以盡量保證每個分割槽的資料分布比較均勻(使用coalesce可能會造成資料傾斜),對於下游使用者來說效率更高。

如何將資料寫入到單個檔案

通過使用repartition(1)和coalesce(1))可用於將dataframe寫入到單個檔案中。通常情況下,不會只將資料寫入到單個檔案中,因為這樣效率很低,寫入速度很慢,在資料量比較大的情況,很可能會出現寫入錯誤的情況。所以,只有當dataframe很小時,我們才會考慮將其寫入到單個檔案中。

何時考慮重分割槽

一般對於在對比較大的資料集進行過濾操作之後,產生的較小資料集,通常需要對其考慮進行重分割槽,從而提公升任務執行的效率。

總結本文主要介紹了spark是如何管理分割槽的,分別解釋了spark提供的兩種分割槽方法,並給出了相應的使用示例和分析。最後對分割槽情況及其影響進行了討論,並給出了一些實踐的建議。希望本文對你有所幫助。

spark更改分割槽 Spark中的分割槽方法詳解

一 spark資料分割槽方式簡要 在spark中,rdd resilient distributed dataset 是其最基本的抽象資料集,其中每個rdd是由若干個partition組成。在job執行期間,參與運算的partition資料分布在多台機器的記憶體當中。這裡可將rdd看成乙個非常大的陣...

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...

Spark的儲存管理

功能上看spark的儲存管理模型可以分為兩部分 rdd快取和shuffle資料的持久化.rdd快取,指的是rdd呼叫cache persist 或checkpoint,呼叫這個三個方法會將rdd對應的資料塊結果儲存到記憶體或者磁碟中,可以將寬依賴的結果儲存下來.shuffle資料持久化 說明 def...