Spark Streaming 高階實戰五個例子

2021-09-06 20:02:43 字數 2197 閱讀 4859

實現 計算 過去一段時間到當前時間 單詞 出現的 頻次

object statefulwordcount 

/*** 把當前的資料去更新已有的或者老的資料

* @param currentvalues 當前的

* @param prevalues 老的

* @return

*/def updatefunction(currentvalues: seq[int], prevalues: option[int]): option[int] =

}

/**

* 使用 spark streaming 完成 詞頻統計,並輸出到 mysql 資料庫

* 建立 資料庫

* * 建立資料表

* create table wordcount (

* word varchar(50) default null,

* wordcount int(10) default null

* )*/

def main(args: array[string]): unit =

// })

//2、正確的方式

result.foreachrdd(rdd => )

connection.close()}})

})//3、更好的方式,查閱官方文件,使用 連線池的方式

//存在的問題,這樣每次都會插入新的資料,同樣的單詞頻次字段不會去累加更新

//解決方案 :每次 insert 之前,判斷一下,該單詞是否已經存在資料庫中,如果已經存在則update

//或者 存放在 hbase /redis 中,呼叫相應的api ,直接 插入和更新。

ssc.start()

ssc.awaittermination()

} def createconnection() =

}

window :定時的進行乙個時間段內的資料處理

window length : 視窗的長度

sliding interval : 視窗的間隔

這2個引數和我們的batch size  成倍數關係。如果不是倍數關係執行直接報錯

每隔多久計算某個範圍內的資料:每隔10秒計算前10分鐘的wc ==>每隔 sliding interval  統計 window length 的值

pair.reducebykeyandwindow((a:int,b:int)=>(a+b),seconds(30),seconds(10))

/**

* 黑名單過濾

* * 訪問日誌 ==> dstream

* * 20180808,zs

* 20180808,ls

* 20180808,ww

* * ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)

* * 黑名單列表 ==》 rdd

* zs ls

* ==>(zs:true) (ls:true)

* *

* leftjoin

* (zs:[<20180808,zs>,]) pass ...

* (ls:[<20180808,ls>,]) pass ...

* (ww:[<20180808,ww>,]) ==> tuple1 ok...

* *

*/def main(args: array[string]): unit = )

clicklog.print()

ssc.start()

ssc.awaittermination()

}}

object sqlnetworkwordcount 

ssc.start()

ssc.awaittermination()

}}/** case class for converting rdd to dataframe */

case class record(word: string)

/** lazily instantiated singleton instance of sparksession */

object sparksessionsingleton

instance

}}

Spark Streaming基本概念和高階操作

對於實時流式資料計算,spark通過spark streaming元件提供了支援。spark streaming基於spark核心,具備可擴充套件性 高吞吐 自動容錯等特性,資料 支援kafka flume twitter zeromq kinesis或tcp socket等。處理時可以使用map ...

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...