批處理系統比較容易實現容錯機制,由於檔案可以重複訪問,當某個任務失敗後,重啟該任務即可。但是在流處理系統中,由於資料來源是無限的資料流,乙個流處理任務甚至可能會執行幾個月,將所有資料快取或是持久化,留待以後重複訪問基本上是不可行的。flink基於分布式快照與可部分重發的資料來源實現了容錯,使用者可自定義對整個job進行快照的時間間隔,當出現任務失敗時,flink將整個job恢復到最近一次快照的狀態,並從資料來源重發快照之後的資料。容錯機制會持續的給data flow 拍攝快照,這個快照動作是輕量級的並不影響流處理;流的狀態會儲存在配置的地方;
快照中核心的元素是流標記
(barriers),
barriers
被注入到資料流中,作為資料流的一部分和其他資料一同流動(正如
infosphere
的punctuation
)這些快照標記訊息和其他訊息一樣在dag中流動,但是不會被使用者定義的業務邏輯所處理,每乙個快照標記訊息都將其所在的資料流分成兩部分:本次快照資料和下次快照資料。每個barrier帶有乙個id,該id為將處於該barrier之前的資料歸入快照的檢查點的id。barrier不會打斷資料流的流動,所以它是十分輕量級的。來自不同的快照的多個barrier可以同一時間存在於同乙個流中,也就是說不同的快照可以並行同時發生。
快照標記訊息沿著dag流經各個操作符,當操作符處理到快照標記訊息時,會對自己的狀態進行快照,並儲存起來。當乙個操作符有多個輸入的時候,flink會將先抵達的快照標記訊息及其之後的訊息快取起來,並不處理其他資料流,當所有的輸入中對應該次快照的快照標記訊息全部抵達後,操作符對自己的狀態快照並儲存,之後處理所有快照標記訊息之後的已快取訊息。操作符對自己的狀態快照並儲存可以是非同步與增量的操作,並不需要阻塞訊息的處理。分布式快照的流程如下圖所示:
當中間乙個operator接收到到所有流的快照標記,這個operator會自己儲存快照標記,處理後發射出去;當sink operator已經接收到所有流的快照標記,
它將快照n通知給checkpoint coordinator。在所有sink都通知了乙個快照後,這個快照就完成了;當所有的data sink(終點操作符)都收到快照標記資訊並對自己的狀態快照和儲存後,整個分布式快照就完成了,同時通知資料來源釋放該快照標記訊息之前的所有訊息。若之後發生節點崩潰等異常情況時,只需要恢復之前儲存的分布式快照狀態,並從資料來源重發該快照以後的訊息就可以了。
對於流處理系統來說,流入的訊息是無限的,所以對於聚合或是連線等操作,流處理系統需要對流入的訊息進行分段,然後基於每一段資料進行聚合或是連線等操作。訊息的分段即稱為視窗,流處理系統支援的視窗有很多態別,最常見的就是時間視窗,基於時間間隔對訊息進行分段處理。本節主要介紹flink流處理系統支援的各種時間視窗。
對於目前大部分流處理系統來說,時間視窗一般是根據task所在節點的本地時鐘來進行切分,這種方式實現起來比較容易,不會阻塞訊息處理。但是可能無法滿足某些應用的要求,例如:
1. 訊息本身帶有時間戳,使用者希望按照訊息本身的時間特性進行分段處理。
2. 由於不同節點的時鐘可能不同,以及訊息在流經各個節點時延遲不同,在某個節點屬於同乙個時間視窗處理的訊息,流到下乙個節點時可能被切分到不同的時間視窗中,從而產生不符合預期的結果。
flink支援三種型別的時間視窗,分別適用於使用者對於時間視窗不同型別的要求:
1. operator time。根據task所在節點的本地時鐘來進行切分的時間視窗。
2. event time。訊息自帶時間戳,根據訊息的時間戳進行處理,確保時間戳在同乙個時間視窗的所有訊息一定會被正確處理。由於訊息可能是亂序流入task的,所以task需要快取當前時間視窗訊息處理的狀態,直到確認屬於該時間視窗的所有訊息都被處理後,才可以釋放其狀態。如果亂序的訊息延遲很高的話,會影響分布式系統的吞吐量和延遲。
3. ingress time。有時訊息本身並不帶有時間戳資訊,但使用者依然希望按照訊息而不是節點時鐘劃分時間視窗(例如,避免上面提到的第二個問題)。此時可以在訊息源流入flink流處理系統時,自動生成增量的時間戳賦予訊息,之後處理的流程與event time相同。ingresstime可以看成是event time的乙個特例,由於其在訊息源處時間戳一定是有序的,所以在流處理系統中,相對於eventtime,其亂序的訊息延遲不會很高,因此對flink分布式系統的吞吐量和延遲的影響也會更小。
flink借鑑了google的millwheel專案,通過watermark來支援基於eventtime時間視窗。
當操作符通過基於event time的時間視窗來處理資料時,它必須在確定所有屬於該時間視窗的訊息全部流入此操作符後,才能開始處理資料。但是由於訊息可能是亂序的,所以操作符無法直接確認何時所有屬於該時間視窗的訊息全部流入此操作符。watermark包含乙個時間戳,flink使用watermark標記所有小於該時間戳的訊息都已流入,flink的資料來源在確認所有小於某個時間戳的訊息都已輸出到flink流處理系統後,會生成乙個包含該時間戳的watermark,插入到訊息流中輸出到flink流處理系統中,flink操作符按照時間視窗快取所有流入的訊息,當操作符處理到watermark時,它對所有小於該watermark時間戳的時間視窗的資料進行處理併發送到下乙個操作符節點,然後也將watermark傳送到下乙個操作符節點。
為了保證能夠處理所有屬於某個時間視窗的訊息,操作符必須等到大於這個時間視窗的watermark之後,才能開始對該時間視窗的訊息進行處理,相對於基於operatortime的時間視窗,flink需要占用更多的記憶體,且會直接影響訊息處理的延遲時間。對此,乙個可能的優化措施是,對於聚合類的操作符,可能可以提前對部分訊息進行聚合操作,當有屬於該時間視窗的新訊息流入時,基於之前的部分聚合結果繼續計算,這樣的話,只需快取中間計算結果即可,無需快取該時間視窗的所有訊息。
對於基於event time時間視窗的操作符來說,流入watermark的時間戳與當前節點的時鐘一致是最簡單理想的狀況了,但是在實際環境中是不可能的,由於訊息的亂序以及前面節點處理效率的不同,總是會有某些訊息流入時間大於其本身的時間戳,真實watermark時間戳與理想情況下watermark時間戳的差別稱為time skew,如下圖所示:
time skew決定了該watermark與上乙個watermark之間的時間視窗所有資料需要快取的時間,time skew時間越長,該時間視窗資料的延遲越長,占用記憶體的時間也越長,同時會對流處理系統的吞吐量產生負面影響。
在流處理系統中,由於流入的訊息是無限的,所以對訊息進行排序基本上被認為是不可行的。但是在flink流處理系統中,基於watermark,flink實現了基於時間戳的全域性排序。
flink基於時間戳進行排序的實現思路如下:排序操作符快取所有流入的訊息,當其接收到watermark時,對時間戳小於該watermark的訊息進行排序,併發送到下乙個節點,在此排序操作符中釋放所有時間戳小於該watermark的訊息,繼續快取流入的訊息,等待下乙個watermark觸發下一次排序。由於watermark保證了其之後不會出現時間戳比它小的訊息,所以可以保證排序的正確性。需要注意的是,如果排序操作符有多個節點,只能保證每個節點的流出訊息是有序的,節點之間的訊息不能保證有序,要實現全域性有序,則只能有乙個排序操作符節點。
此部落格借鑑 李呈祥博文
Flink 容錯機制
flink使用的是基於chandy lamport演算法的分布式快照 chandy lamport algorithm 有興趣的同學可以看看。檢查點配置 streamexecutionenvironment env streamexecutionenvironment.getexecutionenv...
Flink容錯機制
所謂的distributed snapshot,就是為了儲存分布式系統的state,那麼首先我們需要定義清楚什麼是分布式系統的state。考慮到上述分布式模型的定義,分布式系統state同樣是由 程序狀態 和 通道狀態 組成的。在某乙個時刻的某分布式系統的所有程序和所有通道狀態的組合,就是這個分布式...
Flink狀態管理和容錯機制介紹
1.1什麼是有狀態的計算 計算任務的結果不僅僅依賴於輸入,還依賴於它的當前狀態,其實大多數的計算都是有狀態的計算。比如wordcount,給一些word,其計算它的count,這是乙個很常見的業務場景。count做為輸出,在計算的過程中要不斷的把輸入累加到count上去,那麼count就是乙個sta...