如果定義了window assigner 之後,下一步就可以定義視窗內資料的計算邏輯,這也就是 window function 的定義。
flink 中提供了四種型別的 window function , 分別為reducefunction、aggregatefunction 以及 processwindowfunction,(sum 和 max)等。
前三種型別的 window fucntion 按照計算原理的不同可以分為兩大類:
一類是增量聚合函式:對應有 reducefunction、aggregatefunction;
另一類是全量視窗函式,對應有 processwindowfunction(還有 windowfunction)。增量聚合函式計算效能較高,占用儲存空間少,主要因為基於中間狀態的計算結果,視窗中只維護中間結果狀態值,不需要快取原始資料。而全量視窗函式使用的代價相對較高, 效能比較弱,主要因為此時運算元需要對所有屬於該視窗的接入資料進行快取,然後等到視窗觸發的時候,對所有的原始資料進行彙總計算。
下面是aggregatefunction一種用法
package com.jh.windows
import org.apache.flink.api.common.functions.aggregatefunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.windowfunction
import org.apache.flink.streaming.api.windowing.assigners.slidingprocessingtimewindows
import org.apache.flink.streaming.api.windowing.time.time
import org.apache.flink.streaming.api.windowing.windows.timewindow
import org.apache.flink.util.collector
object testaggregatefuntionwindow
) stream.map(log => (log.sid, 1.tolong))
.keyby(_._1)//分組
.window(slidingprocessingtimewindows.of(time.seconds(5), time.seconds(3)))//開窗
.aggregate(new myaggregate, new mywindows)//增量聚合
.print()
env.execute("start --->")
} /* windowfunction 需要傳入4個引數:
* in:是輸入的型別 (他的輸入就是aggregate方法的輸出)
* out:是輸出的型別
* key:key的型別
* w <: window :視窗的型別
* */
class mywindows extends windowfunction[long, (string, long), string, timewindow]
}/*aggregate(new myaggregate): 傳入乙個引數的時候返回值是這樣為何?
1> 4
3> 3
3> 5
2> 3
1> 9
因為在aggregate的方法中
override def getresult(accumulator: long): long = accumulator
返回的就是乙個 累加的數值。
這樣我們就沒法知道是哪個key的資料。
(preaggregator: aggregatefunction[t, acc, v],windowfunction: windowfunction[v, r, k, w])
然後我們使用windowfuntion來處理這種問題,傳入兩個引數 ,把aggregate的資料給windowfuntion處理。 */
/*裡面的add方法, 是來一條資料執行一次 ,getresult,在從視窗結束的時候執行一次*/
class myaggregate extends aggregatefunction[(string, long), long, long] }
/* sid: string : 基站的id
callout: string 主叫號碼
callin: string 被叫號碼
calltype: string 呼叫型別
calltime 呼叫時間
duration 通話時長
資料的樣式
station_9,18600005798,18900002238,busy,1577080455129,0
station_4,18600008825,18900008585,busy,1577080457129,0
station_6,18600005404,18900000558,success,1577080457129,5
station_2,18600002658,18900002018,busy,1577080457129,0
station_2,18600004925,18900001911,busy,1577080457129,0
station_5,18600003713,18900000824,busy,1577080457129,0
*/case class stationlog(sid: string, callout: string,callin: string,calltype: string,calltime:long,duration:long)
mongodb中的aggregate 聚合查詢
aggregate類似於pipe.拆分結果然後對結果進行分析求值然後再返回新結果.mongodb聚合 官方api mongodb aggregate 運用篇 個人總結 fycayy 案例一案例二 案例三 那麼aggregate有什麼作用呢?舉個例子 testname文件中有如下幾個集合 集合一 集合...
spark中aggregate函式的應用與問題
aggregate是rdd中比較常用的乙個方法,其功能是使用者傳入的函式進行運算得出結果,屬於action動作。先看下此方法宣告,def aggregate u classtag zerovalue u seqop u,t u,combop u,u u u從原始碼可以看出,aggregate定義的是...
flink學習 flink架構
flink結構 graph 2個併發度 source為1個併發度 的sockettextstreamwordcount四層執行圖的演變過程 jobgraph streamgraph經過優化後生成了 jobgraph,提交給 jobmanager 的資料結構。executiongraph jobman...