val conf =
newsparkconf()
.("jk").
setmaster
("local"
) val sc =
newsparkcontext
(conf)
val accumulator = sc.longaccumulator
//傳入array集合,指定兩個分片
val unit = sc.
makerdd
(array(1
,5,3
,4),
2)unit.
foreach
(x=>
)//返回值,驅動端獲取最終的值
println
("sun ="
+accumulator.value)
package com.uu.exe2
import org.apache.spark.util.accumulatorv2
/** * 1.繼承accumulatorv2類
* 2.實現相關的方法
*/class
myaddaccumulator extends accumulatorv2[long, long]
else
}//copy累加器,每個節點與驅動都包含累加器
override def copy()
: accumulatorv2[long, long]
=//重置:分片的累加器返回值後,進行重置
override def reset()
: unit =
//將具體的內容傳入自定義累加器,型別為繼承類的第乙個型別
override def add
(v: long)
: unit =
//合併各個分割槽的值
override def merge
(other: accumulatorv2[long, long]
): unit =
//累加器的返回值型別
override def value: long = i
}
實現了與自身累加器同樣的功能
package com.uu.exe2
import org.apache.spark.
/** * created by ibm on 2020/2/27.
*/object add
)println
("sun ="
+accumulator1.value)
}}
Spark累加器 Accumulator 使用詳解
def accumulator t initialvalue t,name string implicit param org.apache.spark.accumulatorparam t org.apache.spark.accumulator t 第乙個引數應是數值型別,是累加器的初始值,第二...
Spark廣播變數與累加器
在dirver定義乙個變數,executor去使用,如果存在多個task,則會建立多個變數的副本,耗費記憶體。如果當前變數是乙個需要計算的值,在driver端是無法獲取的。scala實現 scala 實現 import org.apache.spark.util.doubleaccumulator ...
Spark累加器和廣播變數
累加器有些類似redis的計數器,但要比計數器強大,不僅可以用於計數,還可以用來累加求和 累加合併元素等。假設我們有乙個word.txt文字,我們想要統計該文字中單詞 sheep 的行數,我們可以直接讀取文字filter過濾然後計數。sc.textfile word.txt filter conta...