storm利用acker bolt節點跟蹤訊息,當spout傳送出去的訊息以及這些訊息所衍生出來的訊息均被處理後,spout將受到對應於該訊息的ack。實現要點:
1、storm中每條傳送出去的訊息都會對應乙個隨機的訊息id。
2、spout傳送訊息後,將向acker bolt傳送一條訊息,該訊息內容為,acker bolt將為該訊息建立一條跟蹤項。
3、bolt產生要傳送的訊息時,會計算每條新訊息的訊息id,並將訊息id傳送至acker bolt,acker bolt對訊息id進行異或後儲存。於是,storm對新傳送的訊息進行了跟蹤。
4、blot對輸入的訊息進行ack時,也會將該訊息id傳送到acker bolt,acker bolt對每條訊息id進行異或儲存,由於該訊息在被傳送時,已經向acker bolt傳送過訊息id,之後再被acker時又再次傳送該訊息id。根據異或的語義,這相當於對該訊息的跟蹤結束。
5、acker bolt在更新某乙個訊息的跟蹤值時,若發現其跟蹤值變為零,則向spout節點傳送訊息,表明spout傳送的這條訊息已經被成功處理。
6、若spout在傳送訊息時未指定用於訊息跟蹤的id,系統則不對訊息進行跟蹤,blot新產生的訊息並不會被單獨跟蹤。
7、spout的每條訊息以及由該訊息演化而來的所有訊息的跟蹤負載為16個位元組,8個位元組的根訊息id以及8個位元組的訊息跟蹤值ackvalue.但是,由於storm中採用hashmap對其進行儲存,在32位的jvm中,每條訊息至少需要20個位元組的額外負載,故一條訊息的跟蹤需要40個位元組左右的負載。
(一)1與2中,spout傳送t1到bolt1,傳送t2到bolt2.t1和t2具有相同的內容,但表示不同的備份,每條訊息都會對應乙個id,訊息t1的anchors為,訊息t2的anchors為
(二)3中,spout在acker bolt中註冊了一條記錄rootid=t1^t2。
(三)4與6中,bolt1傳送新的訊息t3、t4、t5到blot3,同時對輸入的訊息進行ack操作,訊息內容為,此時,acker bolt中的跟蹤項為t2^t3^t4^t5>。
(四)在5中,bolt2對輸入的訊息t2進行ack操作,它沒有產生新的訊息,傳送到acker bolt的訊息為,t2異或後消失。
(五)在7中,blot3對輸入的訊息進行ack操作,傳送的訊息為,此時acker bolt中的跟蹤項為0>
(六)acker bolt發現rootid對應的值為零,它認為該rootid對應的訊息以及所有衍生出來的訊息均已經被成功處理,於是它向spout傳送訊息,而spout將呼叫ack**方法。
那麼每條被處理的訊息必須進行ack或者fail操作,否則,雖然有超時機制可以對過期訊息進行清空,但可能導致訊息不斷重傳。(所以專案中每次進入bolt都有唯一性過濾?)
參考:《storm 原始碼分析》
Storm ack和fail機制再論
之前對這個的理解有些問題,今天用到有仔細梳理了一遍,記錄一下 首先開啟storm tracker機制的前提是,1.在spout emit tuple的時候,要加上第3個引數messageid 2.在配置中acker數目至少為1 3.在bolt emit的時候,要加上第二個引數anchor tuple...
Storm ack和fail機制再論
之前對這個的理解有些問題,今天用到有仔細梳理了一遍,記錄一下 首先開啟storm tracker機制的前提是,1.在spout emit tuple的時候,要加上第3個引數messageid 2.在配置中acker數目至少為1 3.在bolt emit的時候,要加上第二個引數anchor tuple...
Storm ack和fail機制再論《轉》
之前對這個的理解有些問題,今天用到有仔細梳理了一遍,記錄一下 首先開啟storm tracker機制的前提是,1.在spout emit tuple的時候,要加上第3個引數messageid 2.在配置中acker數目至少為1 3.在bolt emit的時候,要加上第二個引數anchor tuple...