我們可以有兩種方式來更加優雅的停止流程式,分別是通過http暴露服務,和通過hdfs做訊息中轉來定時掃瞄mark檔案是否存在來觸發關閉服務。
下面我們先來看下通過http暴露服務的核心**:
/****
* 負責啟動守護的jetty服務
* @param port 對外暴露的埠號
* @param ssc stream上下文
/*** 負責接受http請求來優雅的關閉流
* @param ssc stream上下文
*/class closestreamhandler(ssc:streamingcontext) extends abstracthandler
}
然後在來看下另一種方式掃瞄hdfs檔案的方式:
/***
* 通過乙個訊息檔案來定時觸發是否需要關閉流程式
* @param ssc streamingcontext
*/def stopbymarkfile(ssc:streamingcontext):unit= }}
/***
* 判斷是否存在mark file
* @param hdfs_file_path mark檔案的路徑
* @return
*/def i***istsmarkfile(hdfs_file_path:string):boolean=
上面是兩種方式的核心**,最後提下觸發停止流程式:
第一種需要在啟動服務的機器上,執行下面封裝的指令碼:
## tx.log是提交spark任務後的輸出log重定向的log
echo "stop finish"
第二種方式,找到乙個擁有hdfs客戶端機器,向hdfs上寫入指定的檔案:
#生成檔案後,10秒後程式就會自動停止
hadoop fs -touch /spark/streaming/stop
#下次啟動前,需要清空這個檔案,否則程式啟動後就會停止
hadoop fs -rm -r /spark/streaming/stop
** Spark Streaming入門詳解
背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...
Spark Streaming 程式監控
官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...
spark streaming讀取kafka示例
spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...