通過訊息佇列 rocketmq 事務訊息,能達到分布式事務的最終一致。
模擬a賬戶轉賬給b賬戶操作,這個分布式事務有兩個子事務
子事務a:areducetransaction()代表a賬戶扣款
子事務b:bincreasetransaction() 代表b賬戶收款
一:向訊息佇列伺服器傳送半訊息(半訊息無法被消費),如果收到訊息佇列伺服器的ack代表傳送成功,則執行本地事務,(areducetransaction),若本地事務執行成功,會通知訊息佇列伺服器將半訊息轉為正常訊息(可以被消費),否則刪除半訊息。
@restcontroller
@log4j
public
class
controller
", message.
getmsgid()
);log.
info
("開始執行本地事務(使用者a扣款)");
return
areducetransaction
(message);}
};message.
setlocaltransactionexecuter
(localtransactionexecuter)
; message.
setshardingkey
("abc");
producer.
send
(message)
;return
"success";}
/** * 使用者a付款 賬戶餘額較少
*/transactionstatus areducetransaction
(message message)
catch
(exception e)
}}
二:在業務執行過程中,可能存在網路錯誤如上述**(斷電點),此時雖然子事務a執行成功了,但是並沒有通知訊息佇列伺服器將半訊息轉為正常訊息,導致無法執行子事務b。因此在初始化的時候需要指定回查方法,該方法會按照設定的指定時間間隔進行執行,執行一定次數後放棄。
// 回查方法
public
class
localtransactioncheckerimpl
implements
localtransactionchecker
if(redisutils.
get(message.
getmsgid()
))log.
info
("本地事務(使用者a扣款)執行失敗,通知半訊息刪除");
return transactionstatus.rollbacktransaction;
}}
收到訊息後執行第二階段的事務操作,若事務失敗則進行重試,直到操作成功或到達最大重試次數後進行後續干預。
public
class
messagelistener
implements
messagelistener
@override
public string gettopic()
@override
public string gettag()
@override
public
void
process
(consumemessage message)
/** * 使用者b收款 賬戶餘額增加
RocketMQ 事務訊息
一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...
RocketMQ之事務訊息
由於工作流引擎專案中,工作流引擎服務和業務服務是分開的,所以就涉及到了分布式事務的問題。綜合考慮到併發量和分布式事務的保障,最終選擇了事務訊息的方式。首先我們來介紹下本地訊息表這種方案,當訊息佇列不支援事務訊息的時候,我們可以考慮這種方案。基本流程 1 a 系統在自己本地乙個事務裡操作同時,插入一條...
RocketMQ事務訊息實現分析
這週rocketmq發布了4.3.0版本,new feature中最受關注的一點就是支援了事務訊息 今天花了點時間看了下具體的實現內容,下面是簡單的總結。通過馮嘉發布的 rocketmq 4.3正式發布,支援分布式事務 一文可以看到rocketmq採用了2pc的方案來提交事務訊息,同時增加乙個補償邏...