Spark 實時計算整合案例

2021-09-08 18:26:10 字數 4000 閱讀 4094

最近有同學問道,除了使用 storm 充當實時計算的模型外,還有木有其他的方式來實現實時計算的業務。了解到,在使用 storm 時,需要編寫基於程式語言的**。比如,要實現乙個流水指標的統計,需要去編寫相應的業務**,能不能有一種簡便的方式來實現這一需求。在解答了該同學的疑惑後,整理了該實現方案的乙個案例,供後面的同學學習參考。

實現該方案,整體的流程是不變的,我這裡只是替換了其計算模型,將 storm 替換為 spark,原先的資料收集,儲存依然可以保留。

spark 出來也是很久了,說起它,應該並不會陌生。它是乙個開源的類似於 hadoop mapreduce 的通用平行計算模型,它擁有 hadoop mapreduce 所具有的有點,但與其不同的是,mapreduce 的 job 中間輸出結果可以儲存在記憶體中,不再需要回寫磁碟,因而,spark 能更好的適用於需要迭代的業務場景。

通過上圖,我們可以看出,首先是採集上報的日誌資料,將其存放於訊息中介軟體,這裡訊息中介軟體採用的是 kafka,然後在使用計算模型按照業務指標實現相應的計算內容,最後是將計算後的結果進行持久化,db 的選擇可以多樣化,這裡筆者就直接使用了 redis 來作為演示的儲存介質,大家所示在使用中,可以替換該儲存介質,比如將結果存放到 hdfs,hbase cluster,或是 mysql 等都行。這裡,我們使用 spark sql 來替換掉 storm 的業務實現編寫。

在介紹完上面的內容後,我們接下來就去實現該內容,首先我們要生產資料來源,實際的場景下,會有上報好的日誌資料,這裡,我們就直接寫乙個模擬資料類,實現**如下所示:

object kafkaiploginproducer  else 

} def plat(): string =

def ip(): string =

def country(): string =

def city(): string =

def location(): jsonarray =

def main(args: array[string]): unit =

}}

上面**,通過 thread.sleep() 來控制資料生產的速度。接下來,我們來看看如何實現每個使用者在各個區域所分布的情況,它是按照座標分組,平台和使用者id過濾進行累加次數,邏輯用 sql 實現較為簡單,關鍵是在實現過程中需要注意的一些問題,比如物件的序列化問題。這裡,細節的問題,我們先不討論,先看下實現的**,如下所示:

object iploginanalytics 

// create a streamingcontext with the given master url

val ssc = new streamingcontext(conf, seconds(5))

// kafka configurations

val topics = set("test_data3")

val brokers = "dn1:9092,dn2:9092,dn3:9092"

val kafkaparams = map[string, string](

"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.stringencoder")

val iploginhashkey = "mf::ip::login::" + sdf.format(new date())

// create a direct stream

val kafkastream = kafkautils.createdirectstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topics)

val events = kafkastream.flatmap(line => )

def func(iter: iterator[(string, string)]): unit =

}events.foreachrdd based on

*/object internalredisclient extends serializable

def makepool(redishost: string, redisport: int, redistimeout: int,

maxtotal: int, maxidle: int, minidle: int, testonborrow: boolean,

testonreturn: boolean, maxwaitmillis: long): unit =

sys.addshutdownhook(hook.run)}}

def getpool: jedispool =

}// redis configurations

val maxtotal = 10

val maxidle = 10

val minidle = 1

val redishost = "dn1"

val redisport = 6379

val redistimeout = 30000

internalredisclient.makepool(redishost, redisport, redistimeout, maxtotal, maxidle, minidle)

val jedis = internalredisclient.getpool.getresource

while (results.hasnext)

}ssc.start()

ssc.awaittermination()

}}/** case class for converting rdd to dataframe */

case class record(plat: string, uid: string, tm: string, country: string, location: string)

/** lazily instantiated singleton instance of sqlcontext */

object sqlcontextsingleton

instance

}}

我們在開發環境進行測試的時候,使用 local[k] 部署模式,在本地啟動 k 個 worker 執行緒來進行計算,而這 k 個 worker 在同乙個 jvm 中,上面的示例,預設使用 local[k] 模式。這裡我們需要普及一下 spark 的架構,架構圖來自 spark 的官網,[鏈結位址]

這裡,不管是在 local[k] 模式,standalone 模式,還是 mesos 或是 yarn 模式,整個 spark cluster 的結構都可以用改圖來闡述,只是各個元件的執行環境略有不同,從而導致他們可能執行在分布式環境,本地環境,亦或是乙個 jvm 實利當中。例如,在 local[k] 模式,上圖表示在同一節點上的單個程序上的多個元件,而對於 yarn 模式,驅動程式是在 yarn cluster 之外的節點上提交 spark 應用,其他元件都是執行在 yarn cluster 管理的節點上的。

而對於 spark cluster 部署應用後,在進行相關計算的時候會將 rdd 資料集上的函式傳送到集群中的 worker 上的 executor,然而,這些函式做操作的物件必須是可序列化的。上述**利用 scala 的語言特性,解決了這一問題。

在完成上述**後,我們執行**,看看預覽結果如下,執行結果,如下所示:

整體的實現內容不算太複雜,統計的業務指標,這裡我們使用 sql 來完成這部分工作,對比 storm 來說,我們專注 sql 的編寫就好,難度不算太大。可操作性較為友好。

spark實時計算效能優化

1 計算提供兩種模式,一種是jar包本地計算 一種是jsf服務。在執行時衝突,storm也在用服務。4 第三步在此擴量到1000,採用增加執行緒方式,效能達到25ms左右。已在預發 5 第四步召回集在擴量,如效能瓶頸是io,則使用jar包本地計算,但與jdq衝突。需要將線上上報遷移到統一上報服務,服...

Spark Streaming實時計算框架介紹

隨著大資料的發展,人們對大資料的處理要求也越來越高,原有的批處理框架mapreduce適合離線計算,卻無法滿足實時性要求較高的業務,如實時推薦 使用者行為分析等。spark streaming是建立在spark上的實時計算框架,通過它提供的豐富的api 基於記憶體的高速執行引擎,使用者可以結合流式 ...

Spark Streaming實時計算框架介紹

隨著大資料的發展,人們對大資料的處理要求也越來越高,原有的批處理框架mapreduce適合離線計算,卻無法滿足實時性要求較高的業務,如實時推薦 使用者行為分析等。spark streaming是建立在spark上的實時計算框架,通過它提供的豐富的api 基於記憶體的高速執行引擎,使用者可以結合流式 ...