資料來源:
初始化sparksession
package com.kfk.spark.common
import org.apache.spark.sql.sparksession
/** * @author : 蔡政潔
* @email :[email protected]
* @date : 2020/12/2
* @time : 10:02 下午
*/object commsparksessionscala
}
自定義udf將內容改為大寫:
package com.kfk.spark.sql
import com.kfk.spark.common.
/** * @author : 蔡政潔
* @email :[email protected]
* @date : 2020/12/8
* @time : 4:46 下午
*/object udfscala
}
package com.kfk.spark.sql
import org.apache.spark.sql.row
import org.apache.spark.sql.expressions.
import org.apache.spark.sql.types.
/** * @author : 蔡政潔
* @email :[email protected]
* @date : 2020/12/8
* @time : 7:13 下午
*/class mycount extends userdefinedaggregatefunction
/** * 中間進行聚合的時候所處理的資料型別
* @return
*/override
def bufferschema: structtype =
/** * 返回型別
* @return
*/override
def datatype: datatype =
/** * 校驗返回值
* @return
*/override
def deterministic:
boolean
=/**
* 為每個分組的資料執行初始化操作
* @param buffer
*/override
def initialize(buffer: mutableaggregationbuffer)
:unit
=/**
* 每個分組有新的資料進來的時候,如何進行分組對應的聚合值的計算
* @param buffer
* @param input
*/override
def update(buffer: mutableaggregationbuffer, input: row)
:unit
=/**
* 在每乙個節點上的集合值要進行最後的merge
* @param buffer1
* @param buffer2
*/override
def merge(buffer1: mutableaggregationbuffer, buffer2: row)
:unit
=/**
* 返回最終結果
* @param buffer
* @return
*/override
def evaluate(buffer: row)
:any
=}
自定義聚合函式udaf,count
package com.kfk.spark.sql
import com.kfk.spark.common.
/** * @author : 蔡政潔
* @email :[email protected]
* @date : 2020/12/8
* @time : 8:06 下午
*/object udafscala
}
Spark Sql之UDAF自定義聚合函式
udaf user defined aggregate function。使用者自定義聚合函式 我們可能下意識的認為udaf是需要和group by一起使用的,實際上udaf可以跟group by一起使用,也可以不跟group by一起使用,這個其實比較好理解,聯想到mysql中的max min等函...
Spark SQL自定義函式 第五章
1.自定義函式分類 類似於hive當中的自定義函式,spark同樣可以使用自定義函式來實現新的功能。spark中的自定義函式有如下3類 1.udf user defined function 輸入一行,輸出一行 2.udaf user defined aggregation funcation 輸入...
Spark sql 自定義讀取資料源
通常在乙個流式計算的主流程裡,會用到很多對映資料,比較常見的是text文件,但是文件讀進來之後還要匹配相應的schema,本文通過自定義textsource資料來源,自動讀取預設的schema。defaultsource.scala package com.wxx.bigdata.sql custo...