概念:storm的ack-fail機制也就是storm的可靠訊息處理機制,通俗來講就是給spout發出的每個tuple帶上乙個messageid,然後這個spout下面的每乙個bolt
都會給他返回乙個完成情況,只有當每乙個bolt都返回了正確的結果,整個傳送過程才算成功,任何乙個bolt處理不成功,則不成功。
我對ack-fail機制的講解分為三個層面:分別是api應用也就是寫**方面、ack-fail機制的處理過程。
首先是**編寫方面:
假設我們在這個系統中有一種spout和一種bolt,如果你不使用ack-fail機制那麼乙個spout中有三個方法,分別是open(),nexttuple()和outputfields(),
open的作用是初始化那個outputcollector,nexttuple方法就是不斷地取值然後發給下乙個bolt或者就結束了,declareoutputfields方法就是宣告一下我發
射出去的資料id,如果你使用了ack-fail機制那就多了倆方法,ack()和fail(),傳送成功了就調ack方法,不成功就調fail,你在fail裡可以進行重發或者什
麼的,當然這些都是你自己決定,你要是不想做處理函式裡就啥都不寫,我們這裡進行重發:
myspout
fail(object msgid)
ack(object msgid)
declareoutputfields();
}mybolt方法本來的三個方法是prepare(),execute(),declareoutputfields(),prepare方法主要也就是初始化那個outputcollector,execute方法就是執行處理
過程,declareoutputfields也一樣就是宣告一下我發出去的是啥,而在應用了ack-fail機制的bolt中,這裡要顯示的宣告我處理完了:
mybolt
}然後是ack-fail的處理過程方面:
spout---->tuple1---->bolt1---->ack(tuple1)
bolt1---->tuple1-1---->bolt2-1---->ack(tuple1-1);
bolt1---->tuple1-2---->bolt2-1---->ack(tuple1-2);
..........................
只有當每乙個bolt都正確ack了,整個傳送過程才算成功,任何乙個bolt處理不成功,則不成功,重新處理。
那麼ack這個東西他如何判斷前者發射的tuple和ack返回的tuple是不是同乙個呢,這裡主要的概念是異或處理,對於每乙個spout發射任務,ack維護了這樣
一組資料,,spouttaskid標誌著唯一的乙個spouttask,rootid標誌著整個過程的結果,ackvalue記錄著整個過程中不同
的tuple相異或的時候結果的變化,當ackvalue最終等於0的時候,就標誌著整個過程成功了,那麼這個rootid如何計算呢,我們知道每乙個tuple的發射過
程bolt都給了相應的返回tupleid,當這兩個tupleid相同時就表明這一小階段的任務完成了,而tuplid轉化成二進位制是0101形式的,如果返回的tupleid和
這個傳送的tupleid相異或等於0,也就是ackvalue等於0,就證明這兩個是同乙個id,也就表明這一小部分的任務成功了,但是整個過程中可能會有多層bolt,
每乙個bolt的執行速度可能不同,所以注意,如果這些所有結果相異或後,ackvalue等於0,就表明這個傳輸任務完成了。
最後我們來從底層實現來講一下ack-fail機制:
我們執行storm程式時會發現有這樣乙個任務-acktask,看一下原始碼他是繼承了bolt,他就是乙個和其他資料處理的bolt一起存在一起處理的程序,而實際上
整個過程中是存在兩種tuple的,分別是datatuple和acktuple,datatuple主要負責資料的處理,acktuple負責整個過程的排錯,我們先來看這個acktuple,
他其實封裝了acktuple,rootid標識了這個tuple屬於哪個過程,而tupleid標識了每乙個特定的tuple,這個acktuple最終封裝成乙個
messageid這樣乙個物件,而datatuple中就含有這個messageid。接下來我們來看整個過程,兩種執行緒是一起進行的,ack的執行緒比較簡單,當spout發射
乙個datatuple時同時就會發射乙個acktuple,然後他就在這等待響應,spout將datatuple(messageid(acktuple))傳送給bolt,bolt.execute(datatuple)
之後會應答也就是bolt.ack(datatuple),而datatuple中封裝了acktuple,就可以還原出這個acktuple,這樣acktask就等到了ack應答,也就是說這一階段
處理成功,以此類推。
上述過程示意:
spout.emit(datatuple(messageid(acktuple)))--->bolt.excute--->bolt.ack(datatuple(acktuple))
spout.emit(acktuple)
這就是我理解的整個ack-fail機制。
Storm訊息容錯機制(ack fail機制)
storm訊息容錯機制 ack fail 1 介紹 給每個tuple指定 id告訴 storm 系統,無論處理成功還是失敗,spout 都要接收 tuple 樹上所有節點返回的通知。如果處理成功,spout 的ack 方法將會對編號是 msgid 的訊息應答確認 如果處理失敗或者超時,會呼叫 fai...
storm中fieldsGroup的機制
說實話storm功能非常強大,但是參考資料是在是太少了,有些只能自己摸索,專案中用到了fieldsgroup,所以就研究一下。fieldsgroup的機制是把declar中暴露定義的fields中的字段進行hash,然後分到不同的bolt中,開始理解有誤,所以一直跑不通。囧 直接上 在第乙個spou...
Storm 中的ack機制
一.ack原理 storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每乙個tuple的tuple樹 因為乙個tuple通過spout發出了,經過每乙個bolt處理後,會生成乙個新的tuple傳送出去 當acker 框架自啟動的task 發現乙個tuple樹已經處理完成了,它會傳...