這個summarizer用於計算樣本各維特徵的均值,方差等常用統計量
class
multivariateonlinesummarizer
extends
multivariatestatisticalsummary
with
serializable has to be >= 0.0")
if (weight == 0.0) return
this
if (n == 0)
require(n == instance.size, s"dimensions mismatch when adding new sample." +
s" expecting $n but got $.")
val localcurrmean = currmean
val localcurrm2n = currm2n
val localcurrm2 = currm2
val localcurrl1 = currl1
val localweightsum = weightsum
val localnumnonzeros = nnz
val localcurrmax = currmax
val localcurrmin = currmin
//迭代每一維特徵
instance.foreachactive
if (localcurrmin(index) > value)
val prevmean = localcurrmean(index)
val diff = value - prevmean
//式(1)
localcurrmean(index) = prevmean + weight * diff / (localweightsum(index) + weight)
//式(2)
localcurrm2n(index) += weight * (value - localcurrmean(index)) * diff
//平方和
localcurrm2(index) += weight * value * value
//l1範數
localcurrl1(index) += weight * math.abs(value)
localweightsum(index) += weight
localnumnonzeros(index) += 1
}} totalweightsum += weight
weightsquaresum += weight * weight
totalcnt += 1
this
}
兩個統計器之間的合併,如果是帶權重的,n就是權重和,a、b是兩個待合併的統計器,x是合併後的nx
=na+
nb⋯⋯
(3)δ
=eb−
ea⋯⋯
(4)e
x=ea
+δ∗n
bnx⋯
⋯(5)
sx=s
a+sb
+δ2∗
na∗n
bnx⋯
⋯(6)
因此spark的實現
@since("1.1.0")
def merge(other: multivariateonlinesummarizer): this.type = .")
totalcnt += other.totalcnt
totalweightsum += other.totalweightsum
weightsquaresum += other.weightsquaresum
var i = 0
while (i < n)
weightsum(i) = totalnnz
nnz(i) = totalcnnz
i += 1
}} else if (totalweightsum == 0.0 && other.totalweightsum != 0.0)
this
}
從**中可以看到,均值和方差的計算都只計算了非0部分的樣本,在返回時,需要計算入為0的部分。這部分樣本實際相當於方差和均值都是0的統計器,與非0部分merge。
還是套用上面merge的公式,nz是非0部分,z代表樣本值為0的部分 δ=
enz−
ez=e
nz⋯⋯
(7)e
all=
ez+δ
∗nnz
nx=e
nz∗∑
nzwi
∑all
wi⋯⋯
(8)
對應**實現
override def
mean: vector =
vectors.dense(realmean)
}
同理,對於方差來說sx
=sz+
snz+
δ2∗n
a∗nb
nx=s
nz+e
2nz∗
∑zwi
∗∑nz
wi∑a
llwi
⋯⋯(9
) 又由wikipedia可知,對於reliability weights來說,其方差d=
∑wi−
∑w2i
/∑wi
⋯⋯(10
)sr=
sd⋯⋯
(11)
因此其**實現
override def
variance: vector =
} vectors.dense(realvariance)
}
currm2n為online計算得到的s,deltamean為均值,第二項就是上式中的第二項,為0的權重和使用所有權重減去非0的權重和得到的,再除以denominator。∥x
∥2=∑
ix2i
−−−−
−√我們統計時計算了平方和,因此只需要取平方根,對應原始碼
override def
norml2: vector =
vectors.dense(realmagnitude)
}
∥x
∥1=∑
i∣xi
∣ 原始碼
override def
norml1: vector =
同時統計了樣本數量count,各維特徵非0個數(numnonzeros,vector),max/min(vector),直接返回對應值即可 spark 求平均值
val rdd sc.makerdd list a 1 a 2 a 3 b 1 b 2 b 3 b 4 a 4 2 rdd.combinebykey x x,1 x int,int y int x.1 y,x.2 1 x int int y int int x.1 y.1,x.2 y.2 mapva...
Spark之K 均值演算法中的一些問題
spark之k 均值演算法中的一些問題 今天看了乙個千人千面推薦系統的公開課,原理上來說並不難,主要是使用k均值演算法,在之前學過了spark下k均值演算法之後,便嘗試使用k均值演算法去模擬簡單的千人千面推薦系統,以後有時間在慢慢研究使用者畫像 資料資料由使用者id,該使用者訪問19個 的次數組成,...
spark中的容錯
一般來說,分布式資料集的容錯性有兩種方式 資料檢查點和記錄資料的更新。面向大規模資料分析,資料檢查點操作成本很高,需要通過資料中心的網路連線在機器之間複製龐大的資料集,而網路頻寬往往比記憶體頻寬低得多,同時還需要消耗更多的儲存資源。因此,spark選擇記錄更新的方式。但是,如果更新粒度太 細太多,那...