aggregate是rdd中比較常用的乙個方法,其功能是使用者傳入的函式進行運算得出結果,屬於action動作。先看下此方法宣告,
def aggregate[u: classtag](zerovalue: u)(seqop: (u, t) => u, combop: (u, u) => u): u
從原始碼可以看出,aggregate定義的是泛型引數,u是資料型別可以傳入任意型別,seqop是使用者傳入的計算方法,combop是合併方法,
seqop方法是作用於每個分片,comop是合併分片的結果,如果只有乙個分片則合併zerovalue與seqop的結果。
下面看具體例項就能明白了
val arrrdd = sc.parallelize(array(1,2,3,4,9,5),1)
val max = arrrdd.aggregate(1)(math.max(_,_),_ + _)
上面的執行結果是10,math.max是找出每個分片中的最大值,這裡只有乙個分片,所以是9,第二個引數是把zerovalue和第乙個方法的結果加起來,所以是10,一般情況下zerovalue應設為0
再來看多個分片的情況
val arrrdd = sc.parallelize(array(1,2,3,4,9,5),3)
val max = arrrdd.aggregate(1)(math.max(_,_),_ + _)
執行結果是16,為什麼是16,這就是上面說的seqop方法作用於每個分片,也就是找出每個分片的最大值,這裡第乙個分片是2,第二個分片是4,第二個分片是9,然後第二個引數combop方法把這三個結果和zerovalue加起來。
在應用此方法時關鍵是要注意第乙個方法是作用於每個分片,當分片數目不同時得出的結果可能是不同的!在生產環境中不注意這點,運算結果有問題也是很難查詢的!
Spark高階運算元aggregate所遇到的坑
先對區域性聚合,再對全域性聚合 示例 val rdd1 sc.parallelize list 1,2,3,4,5 2 檢視每個分割槽中的元素 將每個分割槽中的最大值求和,注意 初始值是0 如果初始值時候10,則結果為 30,因為在區域性操作和全域性操作的時候都要計算初始值 如果是求和,注意 初始值...
mongodb中的aggregate 聚合查詢
aggregate類似於pipe.拆分結果然後對結果進行分析求值然後再返回新結果.mongodb聚合 官方api mongodb aggregate 運用篇 個人總結 fycayy 案例一案例二 案例三 那麼aggregate有什麼作用呢?舉個例子 testname文件中有如下幾個集合 集合一 集合...
Spark中executor memory引數詳解
我們知道,spark執行的時候,可以通過 executor memory 來設定executor執行時所需的memory。但如果設定的過大,程式是會報錯的,如下 555.png 那麼這個值最大能設定多少呢?本文來分析一下。文中安裝的是spark1.6.1,安裝在hadoop2.7上。1 相關的2個引...