Storm的可靠性與ack機制

2021-08-25 22:15:16 字數 1381 閱讀 6616

無論是實時處理還是離線處理,都會遇到乙個不可避免的問題是,失敗任務如何重做?storm提供了乙個ack機制。首先來看一下ispout介面的方法。

public inte***ce ispout extends serializable
可以看到,提供了兩個方法ack()和fail()。裡面的引數是乙個叫msgid的東西。所謂msgid就是tuple的msgid,每個元組在整個topology中都有唯一的id。

在open中提供了乙個引數叫spoutoutputcollector,該collector是專門用於spout傳送訊息的,其中提供了乙個方法叫listemit(listtuple, object messageid)。當然使用沒有messageid的emit()也可,但是這樣就不會觸發ack機制。使用帶有messageid的emit()方法後,該id就會隨著元組從拓撲傳下去。這是ack機制的基礎。

storm會跟蹤spout產生的每乙個tuple,給tuple指定id就可以告訴storm,無論執行成功還是失敗,spout都要接收tuple所傳過的每乙個節點上返回的通知。乙個十分重要的原則是:

衍生這個概念在下文「錨定」章節會講到。

另外,我們需要在spout中的fail方法中手動寫**重新傳送失敗的元組。

storm中有個特殊的task叫acker,檢視原始碼會發現其也是個bolt。acker對於每個spout-tuple儲存乙個ack-val的校驗值,它的初始值是0,然後每發射乙個tuple或ack乙個tuple時,這個tuple的id就要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的tuple都被ack了,那麼最後ack-val的值就一定是0。acker就根據ack-val是否為0來判斷是否完全處理,如果為0則認為已完全處理。

上文提到,元組有其衍生的元組。在ibolt的prepare()方法中,有著跟ispout中open方法類似的乙個引數是:outputcollector collector,該collector不同於spoutoutputcollector,是bolt用於傳送元組的collector,在該collector中的emit()方法其中有一種是:listemit(tuple anchor, listtuple)。其中,anchor是該bolt在execute()方法中接收到的元組。而list則是要傳送的元組資料。這樣就會將anchor和將要傳送到下個bolt的元組聯絡起來,稱之為錨定

如果tuple沒有錨定,則不會觸發ack機制,無法保證可靠性。

所以每個bolt都要呼叫上述emit()方法並且在之後呼叫collector.ack(tuple)或在捕獲異常的時候呼叫collector.fail(tuple)方法。

Storm中的可靠性

storm中的可靠性 storm的ispout介面定義了三個與可靠性有關的方法 nexttuple,ack和fail。public inte ce ispout extends serializable 我們知道,當storm的spout發射乙個tuple後,他便會呼叫nexttuple 方法,在這...

ActiveMQ可靠性機制

訊息的簽收 acknowledgment 客戶端成功接收一條訊息的標誌是這條訊息被簽收。成功接收一條訊息一般包括如下三個階段 1 客戶端接收訊息 2 客戶端處理訊息 3 訊息被簽收 簽收可以由activemq發起,也可以由客戶端發起,取決於session簽收模式的設定。在帶事務的session中,簽...

ActiveMQ JMS的可靠性機制

預設情況下,生產者傳送的訊息是持久化的。訊息傳送到broker以後,producer會等待broker對這條訊息的處理情況的反饋。可以設定訊息傳送端傳送持久化訊息的非同步方式 connectionfactory setuseasyncsend true 回執視窗大小設定 connectionfact...