對於7×24小時不間斷執行的流程式來說,要保證fault tolerant是很難的,這不像是離線任務,如果失敗了只需要清空已有結果,重新跑一次就可以了。對於流任務,如果要保證能夠重新處理已處理過的資料,就要把資料儲存下來;而這就面臨著幾個問題:比如一是儲存多久的資料?二是重複計算的資料應該怎麼處理,怎麼保證冪等性?
對於乙個流系統,我們有以下希望:
最好能做到exactly-once
處理延遲越低越好
吞吐量越高越好
計算模型應當足夠簡單易用,又具有足夠的表達力
從錯誤恢復的開銷越低越好
足夠的流控制能力(背壓能力)
storm的ack機制
storm的fault tolerant是這樣工作的:每乙個被storm的operator處理的資料都會向其上乙個operator傳送乙份應答訊息,通知其已被下游處理。storm的源operator儲存了所有已傳送的訊息的每乙個下游運算元的應答訊息,當它收到來自sink的應答時,它就知道該訊息已經被完整處理,可以移除了。
如果沒有收到應答,storm就會重發該訊息。顯而易見,這是一種at least once的邏輯。另外,這種方式面臨著嚴重的冪等性問題,例如對乙個count運算元,如果count的下游運算元出錯,source重發該訊息,那麼防止該訊息被count兩遍的邏輯需要程式設計師自己去實現。最後,這樣一種處理方式非常低效,吞吐量很低。
sparkstreaming 的moni batch
storm的實現方式就注定了與高吞吐量無緣。那麼,為了提高吞吐量,把一批資料聚集在一起處理就是很自然的選擇。spark streaming的實現就是基於這樣的思路。
我們可以在完全的連續計算與完全的分批計算中間取折中,通過控制每批計算資料的大小來控制延遲與吞吐量的制約,如果想要低延遲,就用小一點的batch,如果想要大吞吐量,就不得不忍受更高的延遲(更久的等待資料到來的時間和更多的計算)。
以這樣的方式,可以在每個batch中做到exactly-once,但是這種方式也有其弊端:
首先,batch的方式使得一些需要跨batch的操作變得非常困難,例如session window;使用者不得不自己想辦法去實現相關邏輯。
其次,batch模式很難做好背壓。當乙個batch因為種種原因處理慢了,那麼下乙個batch要麼不得不容納更多的新來資料,要麼不得不堆積更多的batch,整個任務可能會被拖垮,這是乙個非常致命的問題。
最後,batch的方式基本意味著其延遲是有比較高的下限的,實時性上不好。
flink的容錯
我們在傳統資料庫,如mysql中使用binlog來完成事務,這樣的思路也可以被用在實現exactly-once模型中。例如,我們可以log下每個資料元素每一次被處理時的結果和當時所處的操作符的狀態。這樣,當我們需要fault tolerant時,我們只需要讀一下log就可以了。這種模式規避了storm和spark所面臨的問題,並且能夠很好的實現exactly-once,唯一的弊端是:如何盡可能的減少log的成本?flink給了我們答案。
實現exactly-once的關鍵是什麼?是能夠準確的知道和快速記錄下來當前的operator的狀態、當前正在處理的元素(以及正處在不同運算元之間傳遞的元素)。如果上面這些可以做到,那麼fault tolerant無非就是從持久化儲存中讀取上次記錄的這些元資訊,並且恢復到程式中。那麼flink是如何實現的呢?
flink的分布式快照的核心是其輕量級非同步分布式快照機制。為了實現這一機制,flink引入了乙個概念,叫做barrier。barrier是一種標記,它被source產生並且插入到流資料中,被傳送到下游節點。當下游節點處理到該barrier標誌時,這就意味著在該barrier插入到流資料時,已經進入系統的資料在當前節點已經被處理完畢。
如圖所示,每當乙個barrier流過乙個運算元節點時,就說明了在該運算元上,可以觸發一次檢查點,用以儲存當前節點的狀態和已經處理過的資料,這就是乙份快照。(在這裡可以聯想一下micro-batch,把barrier想象成分割每個batch的邏輯,會好理解一點)這樣的方式下,記錄快照就像和前面提到的micro-batch一樣容易。
與此同時,該運算元會向下游傳送該barrier。因為資料在運算元之間是按順序傳送的,所以當下游節點收到該barrier時,也就意味著同樣的一批資料在下游節點上也處理完畢,可以進行一次checkpoint,儲存基於該節點的乙份快照,快照完成後,會通知jobmananger自己完成了這個快照。這就是分布式快照的基本含義。
有時,有的運算元的上游節點和下游節點都不止乙個,應該怎麼處理呢?如果有不止乙個下游節點,就向每個下游傳送barrier。同理,如果有不止乙個上游節點,那麼就要等到所有上游節點的同一批次的barrier到達之後,才能觸發checkpoint。因為每個節點運算速度不同,所以有的上游節點可能已經在發下個barrier週期的資料了,有的上游節點還沒傳送本次的barrier,這時候,當前運算元就要快取一下提前到來的資料,等比較慢的上游節點傳送barrier之後,才能處理下一批資料。
當整個程式的最後乙個運算元sink都收到了這個barrier,也就意味著這個barrier和上個barrier之間所夾雜的這批元素已經全部落袋為安。這時,最後乙個運算元通知jobmanager整個流程已經完成,而jobmanager隨後發出通知,要求所有運算元刪除本次快照內容,以完成清理。這整個部分,就是flink的兩階段提交的checkpoint過程,如下面四幅圖所示:
總之,通過這種方式,flink實現了我們前面提到的六項對流處理框架的要求:exactly-once、低延遲、高吞吐、易用的模型、方便的恢復機制。
大資料流式計算 storm 基礎
一 目標 二 storm是什麼?開源的 分布式 流式計算系統 三 分布式起源 四 批量計算與流式計算的對比 流式計算 批量計算的api 推特的summing bird 谷歌的clouddataflow,介面均開源。五 storm元件 主從結構 簡單 高效,但主節點存在單點問題 對稱結構 複雜 效率較...
大資料讀書筆記 2 流式計算
早期和當前的 流式計算 系統分別稱為 連續查詢處理類 和 可擴充套件資料流平台類 計算系統。流式計算系統的特點 1 低延遲 2 極佳的系統容錯性 3 極強的系統擴充套件能力 4 靈活強大的應用邏輯表達能力 目前典型的流式計算系統 s4,storm,millwheel,samza,d stream,h...
大資料處理系統都有哪些? 流式計算系統
流式計算系統就是因為流式計算具有很強的實時性,需要對應用源源不斷產生的資料實時進行處理,使資料不積壓 不丟失,常用於處理電信 電力等行業應用以及網際網路行業的訪問日誌等。在facebook 的 scribe apache的 flume twitter的 storm yahoo的s4 ucberkel...