通過繼承accumulatorv2可以實現自定義累加器。
官方案例可參考:
下面是我自己寫的乙個統計卡種數量的案例。
package com.shuai7boy.myscalacodeimport org.apache.spark.
import org.apache.spark.util.accumulatorv2
case
class card(var card1count: int, var
card2count: int)
class
calccardcount extends accumulatorv2[card, card]
/** * * 複製乙個新的物件
** @return
*/override def copy(): accumulatorv2[card, card] =
/** * * 重置每個分割槽的數值
*/override def reset(): unit =
/** * 每個分割槽累加自己的數值
** @param v
*/override def add(v: card): unit =
/** * * 合併分割槽值,求得總值
** @param other
*/override def merge(other: accumulatorv2[card, card]): unit =other match
} //返回結果
override def value: card =result
}object
cardcount
cc.add(cardinfo)
})cardmaprdd.count()
//執行action,觸發上面的累加操作
println("
card1總數量為:
" + cc.result.card1count + "
,card2總數量為:
" +cc.result.card2count)
}}
列印結果是:
card1總數量為:11,card2總數量為:7
通過上面**,就可以同時統計兩個變數的值了,當然如果需要更多,可以擴充套件。預設的累加器只實現了乙個。
Spark自定義累加器的使用例項詳解
累加器 accumulator 是spark中提供的一種分布式的變數機制,其原理類似於mapreduce,即分布式的改變,然後聚合這些改變。累加器的乙個常見用途是在除錯時對作業執行過程中的事件進行計數。累加器簡單使用 spark內建的提供了long和double型別的累加器。下面是乙個簡單的使用示例...
Spark的累加器
val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...
Spark累加器 Accumulator 使用詳解
def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...