Spark Streaming的資料可靠性和一致性

2021-09-02 12:44:27 字數 3194 閱讀 6420

spark streaming自發布起就得到了廣泛的關注,然而作為乙個年輕的專案,需要提公升的地方同樣很多,比如1.2之前版本driver掛掉可能會丟失資料。這裡將分析它的可靠性機制。

眼下大資料領域最熱門的詞彙之一便是流計算了,其中最耀眼的專案無疑是來自spark社群的spark streaming專案,其從一誕生就受到廣泛關注並迅速發展,目前已有追趕並超越storm的架勢。

對於流計算而言,毫無疑問最核心的特點是它的低時延能力,這主要是來自對資料不落磁碟就進行計算的內部機制,但這也帶來了資料可靠性的問題,即有節點失效或者網路異常時,如何在節點間進行合適的協商來進行重傳。更進一步的,若發生計畫外的資料重傳,怎麼能保證沒有產生重複的資料,所有資料都是精確一次的(exact once)?如果不解決這些問題,大資料的流計算將無法滿足大多數企業級可靠性要求而流於徒有虛名。

本文將重點分析spark streaming是如何設計可靠性機制並實現資料一致性的。

由於流計算系統是長期執行、資料不斷流入的,因此其spark守護程序(driver)的可靠性是至關重要的,它決定了streaming程式能否一直正確地執行下去。

圖一 driver資料持久化

driver實現ha的解決方案就是將元資料持久化,以便重啟後的狀態恢復。如圖一所示,driver持久化的元資料報括:

圖二 driver故障恢復

driver失敗重啟後:

通過如上的資料備份和恢復機制,driver實現了故障後重啟、依然能恢復streaming任務而不丟失資料,因此提供了系統級的資料高可靠。

流計算主要通過網路socket通訊來實現與外部io系統的資料互動。由於網路通訊的不可靠特點,傳送端與接收端需要通過一定的協議來保證資料報的接收確認、和失敗重發機制。

不是所有的io系統都支援重發,這至少需要實現資料流的持久化,同時還要實現高吞吐和低時延。在spark streaming官方支援的data source裡面,能同時滿足這些要求的只有kafka,因此在最近的spark streaming release裡面,也是把kafka當成推薦的外部資料系統。

除了把kafka當成輸入資料來源(inbound data source)之外,通常也將其作為輸出資料來源(outbound data source)。所有的實時系統都通過kafka這個mq來做資料的訂閱和分發,從而實現流資料生產者和消費者的解耦。

乙個典型的企業大資料中心資料流向檢視如下所示:

圖三 企業大資料中心資料流向檢視

除了從源頭保證資料可重發之外,kafka更是流資料exact once語義的重要保障。kafka提供了一套低階api,使得client可以訪問topic資料流的同時也能訪問其元資料。spark streaming的每個接收任務可以從指定的kafka topic、partition和offset去獲取資料流,各個任務的資料邊界很清晰,任務失敗後可以重新去接收這部分資料而不會產生「重疊的」資料,因而保證了流資料「有且僅處理一次」。

在spark 1.3版本之前,spark streaming是通過啟動專用的receiver任務來完成從kafka集群的資料流拉取。

receiver任務啟動後,會使用kafka的高階api來建立topicmessagestreams物件,並逐條讀取資料流快取,每個batchinerval時刻到來時由jobgenerator提交生成乙個spark計算任務。

由於receiver任務存在宕機風險,因此spark提供了乙個高階的可靠接收器-reliablekafkareceiver型別來實現可靠的資料收取,它利用了spark 1.2提供的wal(write ahead log)功能,把接收到的每一批資料持久化到磁碟後,更新topic-partition的offset資訊,再去接收下一批kafka資料。萬一receiver失敗,重啟後還能從wal裡面恢復出已接收的資料,從而避免了receiver節點宕機造成的資料丟失(以下**刪除了細枝末節的邏輯):

class reliablekafkareceiver}}

啟用wal後雖然receiver的資料可靠性風險降低了,但卻由於磁碟持久化帶來的開銷,系統整體吞吐率會有明顯的下降。因此,在最新發布的spark 1.3版本裡,spark streaming增加了使用direct api的方式來實現kafka資料來源的訪問。

引入了direct api後,spark streaming不再啟動常駐的receiver接收任務,而是直接分配給每個batch及rdd最新的topic partition offset。job啟動執行後executor使用kafka的****** consumer api去獲取那一段offset的資料。

這樣做的好處不僅避免了receiver宕機帶來的資料可靠性風險,同時也由於避免使用zookeeper做offset跟蹤,而實現了資料的精確一次性(以下**刪除了細枝末節的邏輯):

class directkafkainputdstream
spark 1.2開始提供了預寫日誌能力,用於receiver資料及driver元資料的持久化和故障恢復。wal之所以能提供持久化能力,是因為它利用了可靠的hdfs做資料儲存。

spark streaming預寫日誌機制的核心api包括:

以上核心api在資料接收和恢復階段的互動示意圖如圖四所示。

圖四 基於wal的資料接收和恢復示意圖

從writeaheadlogwriter的原始碼裡可以清楚地看到,每次寫入一塊資料buffer到hdfs後都會呼叫flush方法去強制刷入磁碟,然後才去取下一塊資料。因此receiver接收的資料是可以保證持久化到磁碟了,因而做到了較好的資料可靠性。

private[streaming] class writeaheadlogwriter else 

}flush()

nextoffset = stream.getpos()

segment

}

得益於kafka這類可靠的data source、以及自身的checkpoint/wal等機制,spark streaming的資料可靠性得到了很好的保證,資料能保證「至少一次」(at least once)被處理。但由於其outbound端的一致性實現還未完善,因此exact once語義仍然不能端到端保證。spark streaming社群已經在跟進這個特性的實現(spark-4122),預計很快將合入trunk發布。

作者簡介:葉琪,華為軟體公司universe產品部高階架構師,專注於大資料底層分布式儲存和計算基礎設施,是華為軟體公司hadoop發行版的主要架構師,目前興趣點在流計算與spark。

優雅的停止SparkStreaming

背景 流式任務需要7 24小時執行,但是有時涉及到公升級 需要主動停止程式,但是分布式程式,沒辦法做到乙個個程序去殺死,所有配置優雅的關閉就顯得至關重要了。可以考慮使用外部檔案儲存或者關係型資料庫 快取等來控制內部程式關閉。此例子使用hdfs建立指定檔案來控制程式的關閉,想要更好的在前端進行控制,可...

Spark Streaming入門詳解

背景 使用spark主要是使用spark streaming,spark streaming的魔力之所在於 1.流式處理,如今是乙個流處理時代,一切與流不相關的都是無效的資料。3.spark streaming本身是乙個程式,spark streaming在處理資料的時候會不斷感知資料。所以對構建複...

Spark Streaming 程式監控

官網中指出,spark中專門為sparkstreaming程式的監控設定了額外的途徑,當使用streamingcontext時,在web ui中會出現乙個 streaming 的選項卡,在此選項卡內,統計的內容展示如下 這其中包括接受的記錄數量,每乙個batch內處理的記錄數,處理時間,以及總共消耗...