RocketMQ事務訊息思路

2021-10-24 07:45:11 字數 1957 閱讀 6991

通過訊息佇列 rocketmq 事務訊息,能達到分布式事務的最終一致。

模擬a賬戶轉賬給b賬戶操作,這個分布式事務有兩個子事務

子事務a:areducetransaction()代表a賬戶扣款

子事務b:bincreasetransaction() 代表b賬戶收款

一:向訊息佇列伺服器傳送半訊息(半訊息無法被消費),如果收到訊息佇列伺服器的ack代表傳送成功,則執行本地事務,(areducetransaction),若本地事務執行成功,會通知訊息佇列伺服器將半訊息轉為正常訊息(可以被消費),否則刪除半訊息。

@restcontroller

@log4j

public

class

controller

", message.

getmsgid()

);log.

info

("開始執行本地事務(使用者a扣款)");

return

areducetransaction

(message);}

};message.

setlocaltransactionexecuter

(localtransactionexecuter)

; message.

setshardingkey

("abc");

producer.

send

(message)

;return

"success";}

/** * 使用者a付款 賬戶餘額較少

*/transactionstatus areducetransaction

(message message)

catch

(exception e)

}}

二:在業務執行過程中,可能存在網路錯誤如上述**(斷電點),此時雖然子事務a執行成功了,但是並沒有通知訊息佇列伺服器將半訊息轉為正常訊息,導致無法執行子事務b。因此在初始化的時候需要指定回查方法,該方法會按照設定的指定時間間隔進行執行,執行一定次數後放棄。

// 回查方法

public

class

localtransactioncheckerimpl

implements

localtransactionchecker

if(redisutils.

get(message.

getmsgid()

))log.

info

("本地事務(使用者a扣款)執行失敗,通知半訊息刪除");

return transactionstatus.rollbacktransaction;

}}

收到訊息後執行第二階段的事務操作,若事務失敗則進行重試,直到操作成功或到達最大重試次數後進行後續干預。

public

class

messagelistener

implements

messagelistener

@override

public string gettopic()

@override

public string gettag()

@override

public

void

process

(consumemessage message)

/** * 使用者b收款 賬戶餘額增加

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...

RocketMQ之事務訊息

由於工作流引擎專案中,工作流引擎服務和業務服務是分開的,所以就涉及到了分布式事務的問題。綜合考慮到併發量和分布式事務的保障,最終選擇了事務訊息的方式。首先我們來介紹下本地訊息表這種方案,當訊息佇列不支援事務訊息的時候,我們可以考慮這種方案。基本流程 1 a 系統在自己本地乙個事務裡操作同時,插入一條...

RocketMQ事務訊息實現分析

這週rocketmq發布了4.3.0版本,new feature中最受關注的一點就是支援了事務訊息 今天花了點時間看了下具體的實現內容,下面是簡單的總結。通過馮嘉發布的 rocketmq 4.3正式發布,支援分布式事務 一文可以看到rocketmq採用了2pc的方案來提交事務訊息,同時增加乙個補償邏...