以 spark 中的 wordcount 作業為例,每個spark作業其實都是乙個 application,每個 application 對應多個 jobs,乙個 action 操作(比如 collect)觸發乙個 job,在wordcount程式中有乙個 job,每個 job 拆成多個 stage(發生 shuffle 的時候回拆分出乙個 stage),reducebykey 處會發生 shuffle。reducebykey 這裡,相當於 stage0 的task在最後執行到 reducebykey 的時候會為每個 stage1 的task都建立乙份檔案(也可能是合併在少量的檔案裡面),每個 stage1 的 task會去各個節點上的各個 task 建立的屬於自己的那乙份檔案裡面拉取資料,每個 stage1 的 task 拉取到的資料一定是相同 key 對應的資料。對相同的key,對應的values,才能去執行我們自定義的function操作(_ + _)。
task會找到 hdfs 上屬於自己的對應資料,每個 task 處理一小塊資料 hdfs block,然後依次去執行運算元操作。task 缺省會建立三份檔案,每乙個檔案裡面,一定是存放相同的key對應的values;但是乙個檔案裡面可能有多個key,以及其對應的values;相同key的values一定是進入同乙個檔案。
下乙個 stage 的每個 task 都會去上乙個 stage 的 task 里拉去屬於自己的那份檔案。
並行度其實就是指的是spark作業中各個stage的task數量,也就代表了spark作業的在各個階段(stage)的並行度。
如果不調節並行度,導致並行度過低會怎麼樣??
假設現在已經在spark-submit指令碼裡面,給我們的spark作業分配了足夠多的資源,比如有50個 executor,每個executor 有10g記憶體,每個 executor 有3個cpu core,基本已經達到了集群或者yarn佇列的資源上限。
如果 task 沒有設定,或者設定的很少,比如就設定了100個 task。現在50個 executor,每個executor 有3個cpu core,也就是說,你的application任何乙個 stage 執行的時候都有總數在150個 cpu core,可以並行執行。但是你現在只有100個task,平均分配一下,每個executor 分配到2個task,那麼同時在執行的task只有100個,每個executor只會並行執行2個task,每個executor剩下的乙個 cpu core 就浪費掉了。
你的資源雖然分配足夠了,但是問題是,並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。
合理的並行度的設定,應該是要設定的足夠大,大到可以完全合理的利用你的集群資源。比如上面的例子,總共集群有150個cpu core,可以並行執行150個task。那麼就應該將你的application 的並行度至少設定成150才能完全有效的利用你的集群資源,讓150個task並行執行,而且task增加到150個以後,既可以同時並行執行,還可以讓每個task要處理的資料量變少。比如總共150g的資料要處理,如果是100個task,每個task計算1.5g的資料,現在增加到150個task可以並行執行,而且每個task主要處理1g的資料就可以。
很簡單的道理,只要合理設定並行度,就可以完全充分利用你的集群計算資源,並且減少每個task要處理的資料量,最終,就是提公升你的整個spark作業的效能和執行速度。
task數量,至少設定成與spark application的總cpu core數量相同(最理想情況是:比如總共150個cpu core,分配了150個task,一起執行,差不多同一時間執行完畢)。
官方推薦task數量設定成spark application總cpu core數量的2~3倍,比如150個cpu core基本要設定task數量為300~500。實際情況與理想情況不同的,有些task會執行的快一點,比如50s就完了,有些task可能會慢一點,比如要1分半才執行完,所以如果你的task數量,剛好設定的跟cpu core數量相同,可能還是會導致資源的浪費,因為,比如150個task,10個先執行完了,剩餘140個還在執行,但是這個時候,有10個cpu core就空閒出來了,就導致了浪費。那如果task數量設定成cpu core總數的2~3倍,那麼乙個task執行完了以後,另乙個task馬上可以補上來,就盡量讓cpu core不要空閒,同時也是盡量提公升spark作業執行的效率和速度,提公升效能。
如何設定乙個spark application的並行度??
可以使用引數spark.default.parallelism設定,比如
sparkconf conf = new sparkconf()
.set("spark.default.parallelism", "500")分配更多資源屬於「重劍無鋒」型別:真正有分量的一些技術和點,其實都是看起來比較平凡,看起來沒有那麼「炫酷」,但是其實是你每次寫完乙個spark作業,進入效能調優階段的時候,應該優先調節的事情,就是這些(大部分時候,可能資源和並行度到位了,spark作業就很快了,幾分鐘就跑完了)?
後續會進行一些「炫酷」的調優方式,比如資料傾斜(100個spark作業,最多10個會出現真正嚴重的資料傾斜問題)jvm調優等等。
spark調優,調節並行度
spark並行度指的是什麼?並行度 其實就是指的是,spark作業中,各個stage的task數量,也就代表了sprark作業的各個階段 stage 的並行度。如果不調節,那麼導致並行度過低,會怎麼樣?假設,現在已經在spark submit指令碼中給我們的spark作業分配了足夠的資源,比如50個...
spark效能調優之提高並行度
並行度就是spark作業中,各個stage的task數量,也就代表了spark作業的在各個階段 stage 的並行度。如果不調節並行度,導致並行度過低,會怎麼樣?假設,現在已經在spark submit指令碼裡面,給我們的spark作業分配了足夠多的資源,比如50個executor,每個execut...
spark調優 並行度調優
乙個job的劃分為乙個action操作觸發 乙個job可以被分為多個stage,在乙個lineage中,發生shuffle操作時會拆分乙個stage,shuffle操作一般發生在以下的幾個運算元中,distinct groupbykey reducebykey aggregatebykey join...