Spark Streaming程式設計指南(四)

2021-08-03 15:57:48 字數 4589 閱讀 8202

部署應用程式

監控應用程式

和rdd類似,dstream允許開發者將流資料持久化到記憶體。使用在dstream上使用persist()方法會自動持久化dstream中的每個edd到記憶體中。這對於dstream需要計算多次的情況非常有用(如在相同資料上進行多個操作)。對於window-based操作(如reducebywindowreducebykeyandwindow)和state-based操作(如updatestatebykey),會隱式地進行持久化,因此,通過window-based操作生成的dstream會自動持久化到記憶體,不需要開發者呼叫persist()

對於通過網路接收的輸入資料流(如kafka,flume,socket等),預設持久化級別設定為在兩個節點中儲存副本,以便進行容錯處理。

注意,不像rdd,dstream的偶人持久化級別會將資料序列化儲存到記憶體中。在之後效能調優中會進一步討論,更多關於持久化級別的資訊參見spark程式設計指南(二)。

streaming應用程式必須7*24小時執行,因此必須對於應用程式邏輯無關的錯誤具有彈性(如系統錯誤,jvm崩潰等)。spark streaming需要將足夠的檢查點資訊放到容錯儲存系統,以便之後從錯誤中恢復,有兩種型別的資料需要設定檢查點。

資料檢查點-儲存生成的rdd到可靠的儲存系統中,對於一些帶狀態的轉換(跨多個批次合併資料)是必要的。在這樣的轉換中,生成的rdd要依賴之前批次的rdd,導致依賴鏈的長度會隨著時間增長。為避免這種無限增長,帶狀態轉換的中間rdd會周期性地在可靠儲存系統中儲存檢查點(如hdfs)來切斷依賴鏈。

總結來說,元資料檢查點用於從驅動程式的錯誤中恢復,如果使用了帶狀態的轉換,資料或rdd檢查點是必要的。

何時啟用檢查點

以下幾種情況必須為應用程式啟用檢查點:

注意,沒有使用帶狀態轉換的簡單streaming應用程式可以不啟用檢查點。在這種情況下,從驅動程式錯誤中恢復也是區域性的(一些已經接收但是沒有處理的資料可能會丟失)。這通常是可接受的,很多spark streaming應用程式用這種方式執行。對非hadoop環境的支援會在未來進行改善。

如何配置檢查點

啟用檢查點,需要設定乙個容錯可靠的檔案系統(如hdfs,s3等)中的目錄,用於儲存檢查點資訊。使用streamingcontext.checkpoint(checkpointdirectory)進行設定。這樣就可以使用帶狀態的轉換了。另外,如果想要讓應用程式從驅動程式的錯誤中恢復,需要重寫streaming應用程式包含以下行為。

這些行為可使用streamingcontext.getorcreate完成,如下。

// function to create and setup a new streamingcontext

def functiontocreatecontext(): streamingcontext =

// get streamingcontext from checkpoint data or create a new one

val context = streamingcontext.getorcreate(checkpointdirectory, functiontocreatecontext _)

// do additional setup on context that needs to be done,

// irrespective of whether it is being started or restarted

context. ...

// start the context

context.start()

context.awaittermination()

如果checkpointdirectory存在,那麼上下文會通過檢查見資料重新建立。如果目錄不存在(如第一次執行),那麼函式functiontocreatecontext會建立新的上下文和dstream。可以參考示例recoverablenetworkwordcount。這個示例程式會像乙個檔案追加網路資料的單詞計數。

除了使用getorcreate之外,還需要保證驅動程式在出現錯誤時會自動重新啟動。這只能通過應用程式的部署方式來完成。之後會進一步討論。

注意,rdd的檢查點會帶來儲存到可靠儲存系統的成本,這會導致需要rdd檢查點的批次處理時間增加。因此,檢查點的時間間隔需要小心設定。對於比較小的批次,每個批次的檢查點可能會明顯減少操作的吞吐量。相反,檢查點太少會導致任務規模的增加,帶來不利影響。對於需要rdd檢查點的帶狀態轉換,預設時間間隔是批時間間隔的倍數,不少於10s。可通過使用dstream.checkpoint(checkpointinterval)進行設定。通常,檢查點時間間隔為5-10個dstream時間間隔是比較好的。

累加器和廣播變數不能從spark streaming的檢查點中恢復。如果啟用了檢查點並且同時使用了累加器或廣播變數,必須為累加器或廣播變數建立懶例項化的單例,以便在驅動程式從失敗中重新啟動後可以重新例項化它們。下面是個示例。

object

wordblacklist }}

instance

}}object

droppedwordscounter }}

instance

}}wordcounts.foreachrdd else

}.collect().mkstring("[", ", ", "]")

val output = "counts at time " + time + " " + counts

})

完整**參見source code。

這一節討論部署spark streaming應用程式的步驟。

要求

要執行spark streaming應用程式,需要以下幾點。

配置寫ahead日誌 - 從spark 1.2開始,引入了寫ahead日誌以實現強大的容錯保證,如果啟用,所有從receiver接收到的資料都會寫入ahead日誌中儲存在配置好的檢查點目錄中。避免了驅動程式恢復時資料丟失的問題,可以做到零資料丟失。可以通過設定spark.streaming.receiver.writeaheadlog.enabletrue來啟用。但是,可能會以降低單個receiver的吞吐量為代價。可通過並行執行多個receiver來增加吞吐量。此外,當寫ahead日誌時,推薦將接收資料的副本關閉,因為日誌已經儲存到複製儲存系統中了。可以通過設定輸入流的儲存等級為storagelevel.memory_and_disk_ser來實現。當使用s3(或者任何不支援flushing的檔案系統)寫ahead日誌時,請家住啟用spark.streaming.driver.writeaheadlog.closefileafterwritespark.streaming.receiver.writeaheadlog.closefileafterwrite。具體參見spark streaming configuration。注意,在啟用i/o加密時,spark不會加密寫入ahead日誌中的資料。如果想要加密ahead日誌中的資料,應該寫入原生支援加密的檔案系統。

設定最大接收速率 - 如果集群資源對應streaming應用程式盡可能快遞處理資料不夠的話,receiver可以進行限速,設定最大接收速率限制records / sec。參見configuration parameters中的receiver引數spark.streaming.receiver.maxrate和kafka引數spark.streaming.kafka.maxrateperpartition。在spark 1.5中,介紹了乙個叫做backpressure的特性,消除設定此速率限制的必要性,spark streaming會自動計算速率限制,並在處理條件改變時動態適配。可以通過設定spark.streaming.backpressure.enabledtrue來啟用backpressure。

公升級應用程式**

如果正在執行的spark streaming應用程式需要公升級到新的應用程式**,有兩種可行的機制。

除了spark的監控功能,spark streaming還有特定的監控功能。當使用streamingcontext時,spark web ui有乙個額外的streamingtab頁,顯示關於正在執行的receiver的統計資料(receiver是否活躍,接收到記錄的數量,receiver錯誤等)以及完成的批次資訊(批次處理時間,佇列延遲等)。這些都可以監控streaming應用程式的程序。

web ui中下面兩個指標尤為重要:

如果批處理時間總是大於批時間間隔和/或排隊延遲持續增加,則說明系統不能快速處理這些批次,正在落後。在這種情況下,要考慮減少批處理時間。

spark streaming的程序也可以使用streaminglistener介面進行監控,這個介面允許獲取receiver的狀態以及處理時間。注意這是乙個開發者api,未來會改進(報告更多的資訊)。

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...

spark streaming讀取kafka示例

spark streaming讀取kafka示例,其中 spark streaming優雅的關閉策略優化部分參考 如何管理spark streaming消費kafka的偏移量部分參考 spark向kafka中寫入資料部分參考 object demo 建立streamingcontext return...