Spark Sql之UDAF自定義聚合函式

2021-09-24 16:23:31 字數 2243 閱讀 6275

udaf:user defined aggregate function。使用者自定義聚合函式

我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max、min等函式,可以:

select max(foo) from foobar group by bar;

表示根據bar欄位分組,然後求每個分組的最大值,這時候的分組有很多個,使用這個函式對每個分組進行處理,也可以:

select max(foo) from foobar;

這種情況可以將整張表看做是乙個分組,然後在這個分組(實際上就是一整張表)中求最大值。所以聚合函式實際上是對分組做處理,而不關心分組中記錄的具體數量。

update:各個分組的值內部聚合

merge:各個節點的同一分組的值聚合

evaluate:聚合各個分組的快取值

定義:

/**

* @author administrator

*/class stringcount extends userdefinedaggregatefunction

// bufferschema,指的是,中間進行聚合時,所處理的資料的型別

def bufferschema: structtype =

// datatype,指的是,函式返回值的型別

def datatype: datatype =

def deterministic: boolean =

// 為每個分組的資料執行初始化操作

def initialize(buffer: mutableaggregationbuffer): unit =

/*** 更新 可以認為乙個乙個地將組內的字段值傳遞進來 實現拼接的邏輯

* 相當於map端的combiner,combiner就是對每乙個map task的處理結果進行一次小聚合

* 大聚和發生在reduce端.

* 這裡即是:在進行聚合的時候,每當有新的值進來,對分組後的聚合如何進行計算

* update的結果寫入buffer中,每個分組中的每一行資料都要進行update操作

*/def update(buffer: mutableaggregationbuffer, input: row): unit =

/*** 合併 update操作,可能是針對乙個分組內的部分資料,在某個節點上發生的 但是可能乙個分組內的資料,會分布在多個節點上處理

* 此時就要用merge操作,將各個節點上分布式拼接好的串,合併起來

* 這裡即是:最後在分布式節點完成後需要進行全域性級別的merge操作

* 也可以是乙個節點裡面的多個executor合併 reduce端大聚合

* merge後的結果寫如buffer1中

*/def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit =

// 最後,指的是,乙個分組的聚合值,如何通過中間的快取聚合值,最後返回乙個最終的聚合值

def evaluate(buffer: row): any =

使用:

object udaf 

val structtype = structtype(array(structfield("name", stringtype, true)))  

val namesdf = sqlcontext.createdataframe(namesrowrdd, structtype) 

// 註冊一張names表

namesdf.registertemptable("names")  

// 定義和註冊自定義函式

// 定義函式:自己寫匿名函式

// 註冊函式:sqlcontext.udf.register()

sqlcontext.udf.register("strcount", new stringcount) 

// 使用自定義函式

sqlcontext.sql("select name,strcount(name) from names group by name")  

.collect()

.foreach(println)  

}  /* 結果:

* [jack,1] 

[tom,3] 

[leo,2] 

[marry,1]

*/

spark中使用自定義UDAF

hive中的自定義函式udaf udaf user defined aggregation funcation 使用者自定義弱型別聚合函式 所有的udaf函式在記憶體裡都是一塊buffer 緩衝區 這個換成區被分成了多個塊,每個塊有乙個index,從0開始。聚合乙個資料時,會占用編號為0的塊。遍歷表...

Hive自定義UDF和聚合函式UDAF

hive是一種構建在hadoop上的資料倉儲,hive把sql查詢轉換為一系列在hadoop集群中執行的mapreduce作業,是mapreduce更高層次的抽象,不用編寫具體的mapreduce方法。hive將資料組織為表,這就使得hdfs上的資料有了結構,元資料即表的模式,都儲存在名為metas...

Hive自定義UDF和聚合函式UDAF

一 hive可以允許使用者編寫自己定義的函式udf,來在查詢中使用。hive中有3種udf udf 操作單個資料行,產生單個資料行 udaf 操作多個資料行,產生乙個資料行。udtf 操作乙個資料行,產生多個資料行乙個表作為輸出。二 編寫自己定義的函式udf 第一步 繼承udf或者udaf或者udt...