//local[n] 其中n要大於接受器的個數
val sparkconf =
newsparkconf()
.setmaster
("local[2]").
("networkwordcount"
)val ssc =
newstreamingcontext
(sparkconf,
seconds(1
))//建立乙個接收器
val lines = ssc.
sockettextstream
("localhost"
,9999
)//指定資料來源
val words = lines.
flatmap
(_.split
(" "))
val wordcounts = words.
map(x =
>
(x,1))
.reducebykey
(_ + _)
wordcounts.
print()
//開始
ssc.
start()
//等待終止訊號
ssc.
awaittermination
()
val sparkconf =
newsparkconf()
.("hdfswordcount").
setmaster
("local[2]"
)val ssc =
newstreamingcontext
(sparkconf,
seconds(2
))// 建立fileinputdstream去讀取檔案系統上的資料
val lines = ssc.
textfilestream
("hdfs://hadoop131:9000/data"
)//使用空格進行分割每行記錄的字串
val words = lines.
flatmap
(_.split
(" "))
//類似於rdd的程式設計,將每個單詞賦值為1,並進行合併計算
val wordcounts = words.
map(x =
>
(x,1))
.reducebykey
(_ + _)
wordcounts.
print()
ssc.
start()
ssc.
awaittermination
()
flume資料來源
1、push的方式讀取資料
val conf: sparkconf =
newsparkconf()
.("flumedemo").
setmaster
("local[3]"
) val ssc =
newstreamingcontext
(conf,
seconds(5
))//push 方式 由主機推送資料給sparkstreaming 需要先啟動sparkstreaming
val flumestream: receiverinputdstream[sparkflumeevent]
= flumeutils.
createstream
(ssc,
"hadoop131"
,5678
)//flume 作為sparking streaming 的實時資料流 每一條資料是乙個event 故此時形成的dstream中的資料是乙個乙個的event
//event 有body 和header
flumestream.
map(x=
>
newstring
(x.event.getbody
.array()
).trim)
.flatmap
(_.split
(" "))
.map
((_,1)
).reducebykey
(_+_)
.print()
ssc.
start()
ssc.
awaittermination
()
2、poll的方式獲取資料
val conf: sparkconf =
newsparkconf()
.("flumedemo").
setmaster
("local[3]"
) val ssc =
newstreamingcontext
(conf,
seconds(5
))//poll方式 主動拉取資料,需要先啟動flume
val flumestream=flumeutils.
createpollingstream
(ssc,
"hadoop131"
,5678
) flumestream.
map(x=
>
newstring
(x.event.getbody.
array()
).trim)
.flatmap
(_.split
(" "))
.map
((_,1)
).reducebykey
(_+_)
.print()
ssc.
start()
ssc.
awaittermination
()
kafka資料來源
//設定主函式的引數 第乙個是brokers 第二個是topics 可以使用逗號隔開 傳入多個topics
//sparkstreaming 可以一次性讀取 kafka中的多個topic中的資料
val array
(brokers, topics)
= args
val sparkconf =
newsparkconf()
.("directkafkawordcount").
setmaster
("local[1]"
) val ssc =
newstreamingcontext
(sparkconf,
seconds(2
))val topicsset = topics.
split
(","
).toset
val kafkaparams = map[string, string]
("bootstrap.servers"
-> brokers)
val messages = kafkautils.createdirectstream[string, string]
(ssc,
locationstrategies.preferconsistent,
consumerstrategies.subscribe[string,
string]
(topicsset,kafkaparams)
) messages.
map(_.
value()
)// 取出 value
.flatmap
(_.split
(" "))
// 將字串使用空格分隔
.map
(word =
>
(word,1)
)// 每個單詞對映成乙個 pair
.reducebykey
(_+_)
// 根據每個 key 進行累加
.print()
// 列印前 10 個資料
ssc.
start()
ssc.
awaittermination
()
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...