背景:流式任務需要7*24小時執行,但是有時涉及到公升級**需要主動停止程式,但是分布式程式,沒辦法做到乙個個程序去殺死,所有配置優雅的關閉就顯得至關重要了。可以考慮使用外部檔案儲存或者關係型資料庫、快取等來控制內部程式關閉。
此例子使用hdfs建立指定檔案來控制程式的關閉,想要更好的在前端進行控制,可以在註冊程式中修改標誌源。
package com.smothclose
import org.apache.hadoop.conf.configuration
import org.apache.hadoop.fs.
import org.apache.spark.streaming.
class monitorstop(ssc: streamingcontext) extends runnable
val state: streamingcontextstate = ssc.getstate
val bool: boolean = fs.exists(new path("/stopsparktest"))
if (bool) }}
}}
package com.smothclose
import org.apache.log4j.
import org.apache.spark.sparkconf
import org.apache.spark.streaming.dstream.
import org.apache.spark.streaming.
object sparktest
// 如果為true,spark會streamingcontext在jvm關閉時正常關閉,而不是立即關閉。
sparkconf.set("spark.streaming.stopgracefullyonshutdown", "true")
val ssc = new streamingcontext(sparkconf, seconds(5))
ssc.checkpoint("./ck1111")
val line: receiverinputdstream[string] = ssc.sockettextstream("node01", 9999)
val word: dstream[string] = line.flatmap(_.split(" "))
val wordandone: dstream[(string, int)] = word.map((_, 1))
val wordandcount: dstream[(string, int)] = wordandone.updatestatebykey(update)
wordandcount.print()
ssc} def main(args: array[string]): unit =
}
會有乙個常駐執行緒來監控標誌位檔案是否存在,若存在檢查該ssc(streamingcontext)的狀態是否為活躍,若是活躍狀態進行停職該執行緒。
注:需要開啟優雅關閉配置(預設是false)
優雅的停止執行緒
thread類的start 方法啟動多執行緒,thread原本也有提供有停止方法stop 但從1.2開始已經廢除了,因為這種方法可能造成執行緒的死鎖,現在實現執行緒的停止需要通過一種柔和的方式進行。範例 實現執行緒柔和的停止 package multithreading public class s...
如何優雅的停止sparkstreaming程式
直接kill 9?不好吧,萬一我這個程式還在處理資料呢?還沒處理完呢?在處理的資料丟失了呢?但是我又想讓它先停一下呢?好了,直接上 吧 語言組織不好 import org.apache.hadoop.conf.configuration import org.apache.hadoop.fs.imp...
執行緒停止繼續 如何優雅的停止乙個執行緒?
在之前的文章中 i code.online 併發程式設計 執行緒基礎 我們介紹了執行緒的建立和終止,從原始碼的角度去理解了其中的細節,那麼現在如果面試有人問你 如何優雅的停止乙個執行緒?你該如何去回答尼 能不能完美的回答尼?這裡有個思考 當處於sleep時,執行緒能否感受到中斷訊號?對於執行緒的停止...