Spark中RDD分割槽以及節點

2021-10-05 02:58:59 字數 2084 閱讀 9424

spark中rdd分割槽

對於二元rdd使用時,例如在使用join()時 我們對資料集是如何分割槽的卻一無所知。預設情況下,連線操作會將兩個資料集中的所有鍵的雜湊值都求出來,將該雜湊值相同的記錄通過網路傳到同一臺機器 上,然後在那台機器上對所有鍵相同的記錄進行連線操作,會非常消耗效能,如果乙個資料集設定分割槽,那麼在join時spark就知道如何分割槽第二個資料集就會預設按照第乙個進行資料分發,降低了網路傳輸損耗。

事實上,許多其他 spark 操作會自動為結果 rdd 設定已知的分割槽方式資訊,而且除 join() 外還有很多操作也會利用到已有的分割槽資訊。比如,sortbykey() 和 groupbykey() 會分別生成範圍分割槽的 rdd 和雜湊分割槽的 rdd。而另一方面,諸如 map() 這樣的操作會 導致新的 rdd 失去父 rdd 的分割槽資訊,因為這樣的操作理論上可能會修改每條記錄的鍵。可以使用mapvalues();

xxrdd.partitionby(new spark.hashpartitioner(nums)).persist()

如果不呼叫 persist() 的話,(是轉換操作)後續的 rdd 操作會對 partitioned 的整個譜系重新求值,這會導致對資料集 一遍又一遍地進行雜湊分割槽操作。

driver和worker

driver驅動器

driver會根據當前的worker節點集合,嘗試把所有任務基於資料所在位置分配給合適的worker程序。當任務執行時,worker程序會把快取資料儲存起來,而driver程序同樣會跟蹤這些快取資料的位置,並且利用這些位置資訊來排程以後的任務,以儘量減少資料的網路傳輸。

worker執行器

執行原理

我們使用spark-submit提交乙個spark作業之後,這個作業就會啟動乙個對應的driver程序。根據你使用的部署模式(deploy-mode)不同,driver程序可能在本地啟動,也可能在集群中某個工作節點上啟動。driver程序本身會根據我們設定的引數,占有一定數量的記憶體和cpu core。而driver程序要做的第一件事情,就是向集群管理器(可以是spark standalone集群,也可以是其他的資源管理集群,yarn作為資源管理集群)申請執行spark作業需要使用的資源,這裡的資源指的就是executor程序。yarn集群管理器會根據我們為spark作業設定的資源引數,在各個工作節點上,啟動一定數量的executor程序,每個executor程序都占有一定數量的記憶體和cpu core。

在申請到了作業執行所需的資源之後,driver程序就會開始排程和執行我們編寫的作業**了。driver程序會將我們編寫的spark作業**分拆為多個stage,每個stage執行一部分**片段,並為每個stage建立一批task,然後將這些task分配到各個executor程序中執行。task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個**片段),只是每個task處理的資料不同而已。乙個stage的所有task都執行完畢之後,會在各個節點本地的磁碟檔案中寫入計算中間結果,然後driver就會排程執行下乙個stage。下乙個stage的task的輸入資料就是上乙個stage輸出的中間結果。如此迴圈往復,直到將我們自己編寫的**邏輯全部執行完,並且計算完所有的資料,得到我們想要的結果為止。

shuffle過程記憶體分配

shuffle 過程記憶體分配使用 shufflememorymanager 類管理,會針對每個 task 分配記憶體,task 任務完成後通過 executor 釋放空間.這裡可以把 task 理解成不同 key 的資料對應乙個 task. 早期的記憶體分配機制使用公平分配,即不同 task 分配的記憶體是一樣的,但是這樣容易造成記憶體需求過多的 task 的 outofmemory, 從而造成多餘的 磁碟 io 過程,影響整體的效率.(例:某乙個 key 下的資料明顯偏多,但因為大家記憶體都一樣,這乙個 key 的資料就容易 outofmemory).1.5版以後task 共用乙個記憶體池,記憶體池的大小預設為 jvm 最大執行時記憶體容量的16%,分配機制如下:假如有 n 個 task,shufflememorymanager 保證每個 task 溢位之前至少可以申請到1/2n 記憶體,且至多申請到1/n,n 為當前活動的 shuffle task 數,因為n 是一直變化的,所以 manager 會一直追蹤 task 數的變化,重新計算佇列中的1/n 和1/2n.

spark 的RDD分割槽

rdd的倆種建立方 1.從集合中建立rdd,spark主要提供了兩種函式 parallelize和makerdd 使用parallelize 從集合建立 scala val rdd sc.parallelize array 1,2,3,4,5,6,7,8 使用makerdd 從集合建立 scala ...

spark 獲取RDD的方式以及從分割槽中獲益的操作

spark獲取rdd的分割槽方式 使用partitioner物件,本質上是告訴我們rdd中各個健分別屬於哪個分割槽。isdefined 用來判斷該物件是不是有值 get 使用get來獲取其中的值 date.partitioner.isdefined date.partitioner.get spar...

Spark中RDD的分割槽數時如何的?

看目錄可能方便val rdd sc.parallelize list,6 分割槽數 指定分割槽數 val sc new sparkcontext new sparkconf set spark.default.parallelism 10 setmaster local 4 test 預設分割槽數 ...