spark中的online均值 方差統計

2021-09-30 14:23:32 字數 3352 閱讀 8444

這個summarizer用於計算樣本各維特徵的均值,方差等常用統計量

class

multivariateonlinesummarizer

extends

multivariatestatisticalsummary

with

serializable has to be >= 0.0")

if (weight == 0.0) return

this

if (n == 0)

require(n == instance.size, s"dimensions mismatch when adding new sample." +

s" expecting $n but got $.")

val localcurrmean = currmean

val localcurrm2n = currm2n

val localcurrm2 = currm2

val localcurrl1 = currl1

val localweightsum = weightsum

val localnumnonzeros = nnz

val localcurrmax = currmax

val localcurrmin = currmin

//迭代每一維特徵

instance.foreachactive

if (localcurrmin(index) > value)

val prevmean = localcurrmean(index)

val diff = value - prevmean

//式(1)

localcurrmean(index) = prevmean + weight * diff / (localweightsum(index) + weight)

//式(2)

localcurrm2n(index) += weight * (value - localcurrmean(index)) * diff

//平方和

localcurrm2(index) += weight * value * value

//l1範數

localcurrl1(index) += weight * math.abs(value)

localweightsum(index) += weight

localnumnonzeros(index) += 1

}} totalweightsum += weight

weightsquaresum += weight * weight

totalcnt += 1

this

}

兩個統計器之間的合併,如果是帶權重的,n就是權重和,a、b是兩個待合併的統計器,x是合併後的nx

=na+

nb⋯⋯

(3)δ

=eb−

ea⋯⋯

(4)e

x=ea

+δ∗n

bnx⋯

⋯(5)

sx=s

a+sb

+δ2∗

na∗n

bnx⋯

⋯(6)

因此spark的實現

@since("1.1.0")

def merge(other: multivariateonlinesummarizer): this.type = .")

totalcnt += other.totalcnt

totalweightsum += other.totalweightsum

weightsquaresum += other.weightsquaresum

var i = 0

while (i < n)

weightsum(i) = totalnnz

nnz(i) = totalcnnz

i += 1

}} else if (totalweightsum == 0.0 && other.totalweightsum != 0.0)

this

}

從**中可以看到,均值和方差的計算都只計算了非0部分的樣本,在返回時,需要計算入為0的部分。這部分樣本實際相當於方差和均值都是0的統計器,與非0部分merge。

還是套用上面merge的公式,nz是非0部分,z代表樣本值為0的部分 δ=

enz−

ez=e

nz⋯⋯

(7)e

all=

ez+δ

∗nnz

nx=e

nz∗∑

nzwi

∑all

wi⋯⋯

(8)

對應**實現

override def

mean: vector =

vectors.dense(realmean)

}

同理,對於方差來說sx

=sz+

snz+

δ2∗n

a∗nb

nx=s

nz+e

2nz∗

∑zwi

∗∑nz

wi∑a

llwi

⋯⋯(9

) 又由wikipedia可知,對於reliability weights來說,其方差d=

∑wi−

∑w2i

/∑wi

⋯⋯(10

)sr=

sd⋯⋯

(11)

因此其**實現

override def

variance: vector =

} vectors.dense(realvariance)

}

currm2n為online計算得到的s,deltamean為均值,第二項就是上式中的第二項,為0的權重和使用所有權重減去非0的權重和得到的,再除以denominator。∥x

∥2=∑

ix2i

−−−−

−√我們統計時計算了平方和,因此只需要取平方根,對應原始碼

override def

norml2: vector =

vectors.dense(realmagnitude)

}

∥x

∥1=∑

i∣xi

∣ 原始碼

override def

norml1: vector =

同時統計了樣本數量count,各維特徵非0個數(numnonzeros,vector),max/min(vector),直接返回對應值即可

spark 求平均值

val rdd sc.makerdd list a 1 a 2 a 3 b 1 b 2 b 3 b 4 a 4 2 rdd.combinebykey x x,1 x int,int y int x.1 y,x.2 1 x int int y int int x.1 y.1,x.2 y.2 mapva...

Spark之K 均值演算法中的一些問題

spark之k 均值演算法中的一些問題 今天看了乙個千人千面推薦系統的公開課,原理上來說並不難,主要是使用k均值演算法,在之前學過了spark下k均值演算法之後,便嘗試使用k均值演算法去模擬簡單的千人千面推薦系統,以後有時間在慢慢研究使用者畫像 資料資料由使用者id,該使用者訪問19個 的次數組成,...

spark中的容錯

一般來說,分布式資料集的容錯性有兩種方式 資料檢查點和記錄資料的更新。面向大規模資料分析,資料檢查點操作成本很高,需要通過資料中心的網路連線在機器之間複製龐大的資料集,而網路頻寬往往比記憶體頻寬低得多,同時還需要消耗更多的儲存資源。因此,spark選擇記錄更新的方式。但是,如果更新粒度太 細太多,那...