hive中的自定義函式udaf
udaf(user- defined aggregation funcation),使用者自定義弱型別聚合函式
所有的udaf函式在記憶體裡都是一塊buffer(緩衝區),這個換成區被分成了多個塊,每個塊有乙個index,從0開始。聚合乙個資料時,會占用編號為0的塊。
遍歷表中的每一行資料,然後扔到udaf中做聚合,先把buffer中已存的資料拿出來和新的資料做合併,然後再扔到buffer中,下次在拿乙個新的資料做相同的過程。
二 自定義去重udaf
import org.apache.spark.sql.rowimport org.apache.spark.sql.expressions.
import org.apache.spark.sql.types.
class groupconcatdistinct extends userdefinedaggregatefunction
override def update(buffer: mutableaggregationbuffer, input: row): unit = else
//對buffer進行更新,更新的塊是0,更新的資料是buffercityinfo
buffer.update(0, buffercityinfo)}}
//將兩個自定義udaf的值彙總到一起
override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = else}}
buffer1.update(0, buffercityinfo1)
}//獲取最後的值
override def evaluate(buffer: row): any =
}三 udaf在spark中的註冊使用
註冊:(1)sparksession.udf.register("concat_long_string", (v1:long, v2:string, split:string) =>)
(2)sparksession.udf.register("group_concat_distinct", new groupconcatdistinct)
使用:val sql = "select area,pid,count(*) click_count,"+"group_concat_distinct(concat_long_string(city_id,city_name,':')) " +"city_infos" +" from tmp_area_basic_info group by area,pid "
Spark Sql之UDAF自定義聚合函式
udaf user defined aggregate function。使用者自定義聚合函式 我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max min等函...
Spark自定義排序
在這之前,我們先準備一些資料,使用rdd存放 獲得sparkcontext val conf sparkconf newsparkconf setmaster local 2 val sc newsparkcontext conf val rdd sc.parallelize list 公尺家雷射投...
Spark自定義排序
spark支援我們自定義,只需要繼承相應的類就可以了,我在下面準備了乙個用身高和年齡做二次排序的例子,希望可以幫到大家 首先寫乙個排序類 名字 年齡 身高 class people val name string val age int,val hight int extends ordered p...