①、把磁碟上的500g資料分割為100塊(chunks),每份5gb。(注意,要留一些系統空間!)
②、順序將每份5gb資料讀入記憶體,使用quick sort演算法排序。
③、把排序好的資料(也是5gb)存放回磁碟。
④、迴圈100次,現在,所有的100個塊都已經各自排序了。(剩下的工作就是如何把它們合併排序!)
⑤、從100個塊中分別讀取5g/100=0.05 g入記憶體(100input buffers)。
⑥、執行100路合併,並將合併結果臨時儲存於5g基於記憶體的輸出緩衝區中。當緩衝區寫滿5gb時,寫入硬碟上最終檔案,並清空輸出緩衝區;當100個輸入緩衝區中任何乙個處理完畢時,寫入該緩衝區所對應的塊中的下乙個0.05 gb,直到全部處理完成。
首先從原始碼角度來看:
// pairrddfunctions.scala
def countbykey(): map[k, long] = self.withscope
// rdd.scala
def countbyvalue()(implicit ord: ordering[t] = null): map[t, long] = withscope
countbyvalue(rdd.scala)
countbykey(pairrddfunctions.scala)
問題:
val rdd1: rdd[int] = sc.makerdd(1 to 10)
val rdd2: rdd[(int, int)] = sc.makerdd((1 to 10).tolist.zipwithindex)
val result1 = rdd1.countbyvalue() //可以
val result2 = rdd1.countbykey() //語法錯誤
val result3 = rdd2.countbyvalue() //可以
val result4 = rdd2.countbykey() //可以
其中join操作是考驗所有資料庫效能的一項重要指標,對於spark來說,考驗join的效能就是shuffle,shuffle 需要經過磁碟和網路傳輸,shuffle資料越少效能越好,有時候可以盡量避免程式進行shuffle ,那麼什麼情況下有shuffle ,什麼情況下沒有shuffle 呢
broadcast join 比較好理解,除了自己實現外,spark sql
已經幫我們預設來實現了,其實就是小表分發到所有executors
,控制引數是:spark.sql.autobroadcastjointhreshold
預設大小是10m, 即小於這個閾值即自動使用broadcast join
.
其實rdd方式和table類似,不同的是後者要寫入bucket表,這裡主要講rdd的方式,原理就是,當兩個rdd根據相同分割槽方式,預先做好分割槽,分割槽結果是一致的,這樣就可以進行bucket join, 另外這種join沒有預先的運算元,需要在寫程式時候自己來開發,對於表的這種join可以看一下 位元組跳動在spark sql上的核心優化實踐 。可以看下下面的例子
rdd1、rdd2都是pair rdd
rdd1、rdd2的資料完全相同
一定有shuffle
rdd1 => 5個分割槽
rdd2 => 6個分割槽
rdd1 => 5個分割槽 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
rdd2 => 5個分割槽 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
一定沒有shuffle
rdd1 => 5個分割槽 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
rdd2 => 5個分割槽 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
這樣所有shuffle
的運算元,如果資料提前做好了分割槽(partitionby
),很多情況下沒有shuffle
.
除上面兩種方式外,一般就是有shuffle
的join
, 關於spark的join原理可以檢視:大資料開發-spark join原理詳解
有個運算元例外,那就是sortbykey,其底層有個抽樣演算法,水塘抽樣,最後需要根據抽樣的結果,進行rangepartition的,所以從job角度來說會看到兩個job,除了觸發action的本身運算元之外,記住下面的
sortbykey → 水塘抽樣→ collect
靈魂拷問 如何提高系統的效能?
昨天發現了系統的效能問題,於是乎開發人員問我,你有好的建議嗎?我竟然一臉懵逼。看來,多讓別人提問你可以提高自己的水平,於是本人陷入沉思,如何該提高系統的效能?容哀家好好思考一番 一般來講,系統是由哪幾個方面組成的?資料庫 應用 網路組成的,那應該從這三方面去定位和提公升。資料的處理 資料備份 資料離...
靈魂拷問 說說你對前端的理解
大多數人對前端的認識可能是寫點簡單的 html,css 樣式,製作點網頁介面,其實不然。2005 年之前,前端開發這個詞還是很少見的,尤其在國內。但近幾年的前端可謂是飛速發展,在沒有前端崗位之前,乙個專案中負責後台功能邏輯處理的 和前端頁面展示 混在一起,不免會給開發和維護帶來各種問題,漸漸的許多大...
大資料開發 Spark 閉包的理解
閉包是乙個函式,返回值依賴於宣告在函式外部的乙個或多個變數。閉包通常來講可以簡單的認為是可以訪問乙個函式裡面區域性變數的另外乙個函式。如下面這段匿名的函式 val multiplier i int i 10函式體內有乙個變數 i,它作為函式的乙個引數。如下面的另一段 val multiplier i...