2. 自定義累加器
使用spark自帶的常用累加器的步驟:
1)需求:給出乙個集合,list((「a」, 1), (「a」, 2), (「a」, 3), (「a」, 4)) ,來統計單詞出現的次數,期望的輸出結果:(a, 10)
2)需求分析:
(1)沒有使用累加器的情況
(2)使用累加器的情況
3)**實現
import org.apache.spark.rdd.rdd
import org.apache.spark.util.longaccumulator
import org.apache.spark.
object test08_accumulator
}// 列印是在driver端
println((
"a", sum)
)// (a,0)
// 如果需要通過executor對driver端定義的變數進行更新的話,需要定義累加器實現;累加器和普通變數相比,會將executor端的結果,收集到driver端進行彙總
使用累加器實現資料的聚合功能
.1 宣告累加器
val sum1: longaccumulator = sc.
longaccumulator
("sum1"
) datardd.foreach
}.3 獲取累加器
println
(sum1.value)
//4.關閉連線
sc.stop()
}}
【注】spark中常用的累加器有:longaccumulator、doubleaccumulator、collectionaccumulator。如果常用的累加器滿足不了需求可以自定義累加器來實現需求。
1)需求:自定義累加器,統計集合中首字母為「h」單詞出現的次數。 資料:list(「hello」, 「hello」, 「hello」, 「hello」, 「hello」, 「spark」, 「spark」)
2)**實現
import org.apache.spark.rdd.rdd
import org.apache.spark.util.accumulatorv2
import org.apache.spark.
import scala.collection.mutable
object test09_accumulator2
}// 獲取累加器的值
println
(acc.value)
// 關閉連線
sc.stop()
}}// 自定義乙個類繼承accumulatorv2,設定輸入輸出資料型別
class
myaccumulator
extends
accumulatorv2
[string, mutable.map[string, long]
]// 重置累加器
override def reset()
: unit = map.
clear()
// 向累加器中新增資料
override def add
(elem: string)
: unit =
}// 合併累加器
override def merge
(other: accumulatorv2[string, mutable.map[string, long]])
: unit =}}
// 獲取累加器的值
override def value: mutable.map[string, long]
= map
}
Spark累加器 Accumulator 使用詳解
def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...
Spark的累加器
val conf newsparkconf jk setmaster local val sc newsparkcontext conf val accumulator sc.longaccumulator 傳入array集合,指定兩個分片 val unit sc.makerdd array 1 5...
累加器的作用
1 在運算器中,累加器是專門存放算術或邏輯運算的乙個運算元和運算結果的暫存器。能進行加 減 讀出 移位 迴圈移位和求補等操作。是運算器的主要部分。2 在 處理器cpu中,累加器是一種暫存器,它用來儲存計算所產生的中間結果。如果沒有像累加器這樣的暫存器,那麼在每次計算 加法,乘法,移位等 後就必須要把...