wordcount案例
案例一:
import org.apache.spark.streaming._
val ssc = new streamingcontext(sc,seconds(5));
val lines = ssc.textfilestream("file:///home/software/stream");
//val lines = ssc.textfilestream("hdfs://hadoop01:9000/wordcount");
val words = lines.flatmap(_.split(" "));
val wordcounts = words.map((_,1)).reducebykey(_+_);
wordcounts.print();
ssc.start();
基本概念
1. streamingcontext
streamingcontext是spark streaming程式設計的最基本環境物件,就像spark程式設計中的sparkcontext一樣。streamingcontext提供最基本的功能入口,包括從各途徑建立最基本的物件dstream(就像spark程式設計中的rdd)。
建立streamingcontext的方法很簡單,生成乙個sparkconf例項,設定程式名,指定執行週期(示例中是5秒),這樣就可以了:
val sc=new sparkcontext(conf)
val ssc = new streamingcontext(sc, seconds(5))
執行週期為5秒,表示流式計算每間隔5秒執行一次。這個時間的設定需要綜合考慮程式的延時需求和集群的工作負
載,應該大於每次的執行時間。
streamingcontext還可以從乙個現存的org.apache.spark.sparkcontext建立而來,並保持關聯,比如上面示例中的建立方法:
val ssc = new streamingcontext(sc, seconds(5))
streamingcontext建立好之後,還需要下面這幾步來實現乙個完整的spark流式計算:
(1)建立乙個輸入dstream,用於接收資料;
(2)使用作用於dstream上的transformation和output操作來定義流式計算(spark程式是使用transformation和action操作);
(3)啟動計算,使用streamingcontext.start();
(4)等待計算結束(人為或錯誤),使用streamingcontext.awaittermination();
(5)也可以手工結束計算,使用streamingcontext.stop()。
2. dstream抽象
dstream(discretized stream)是spark streaming的核心抽象,類似於rdd在spark程式設計中的地位。dstream表示連續的資料流,要麼是從資料來源接收到的輸入資料流,要求是經過計算產生的新資料流。dstream的內部是乙個rdd序列,每個rdd對應乙個計算週期。比如,在上面的wordcount示例中,每5秒乙個週期,那麼每5秒的資料都分別對應乙個rdd,如圖所示,圖中的時間點1、2、3、4代表連續的時間週期。
所有應用在dstream上的操作,都會被對映為對dstream內部的rdd上的操作,比如上面的wordcount示例中對lines dstream的flatmap操作,如下圖
rdd操作將由spark核心來排程執行,但dstream遮蔽了這些細節,給開發者更簡潔的程式設計體驗。當然,我們也可以直接對dstream內部的rdd進行操作(後面會講到)。
案例二:
經過測試,案例一**確實可以監控指定的資料夾處理其中產生的新的檔案
但資料在每個新的週期到來後,都會重新進行計算
而如果需要對歷史資料進行累計處理 該怎麼做呢?
sparkstreaming提供了checkpoint機制,首先需要設定乙個檢查點目錄,在這個目錄,儲存了歷史週期資料。通過在臨時檔案中儲存中間資料 為歷史資料累計處理提供了可能性
import org.apache.spark.streaming._
val ssc = new streamingcontext(sc,seconds(5));
ssc.checkpoint("file:///home/software/chk");
val lines = ssc.textfilestream("file:///home/software/stream");
val result= lines.flatmap(_.split(" ")).map((_,1)).updatestatebykey}
result.print();
ssc.start();
updatestatebykey 方法說明:
1.seq:是乙個序列,存的是某個key的歷史資料
2.op:是乙個值,是某個key當前的值
比如: (hello,1)
①seq裡是空的,some(1)=>some(返回的是歷史值的和+當前值)
②(hello,2),seq(1) op=2 some(1+2)
③(hello,1) , seq(1,2) op=1 some(3+1)
案例三:
但是這上面的例子裡所有的資料不停的累計 一直累計下去
很多的時候我們要的也不是這樣的效果我們希望能夠每隔一段時間重新統計下一段時間的資料,並且能夠對設定的批時間進行更細粒度的控制,這樣的功能可以通過滑動視窗的方式來實現。
在dstream中提供了如下的和滑動視窗相關的方法:
window(windowlength, slideinterval)
windowlength:視窗長度
slideinterval:滑動區間
可以通過以上機制改造案例
注意:視窗長度和滑動長度必須是batch size的整數倍
此外,使用視窗機制,必須要設定檢查點目錄
Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...