RabbitMQ 訊息冪等性問題

2021-10-06 22:55:03 字數 2900 閱讀 4454

關於mq消費者的冪等性問題,在於mq的重試機制,因為網路原因或客戶端延遲消費導致重複消費。使用mq重試機制需要注意的事項以及如何解決消費者冪等性問題以下將逐一講解。

1. rabbitmq自動重試機制

消費者在消費訊息的時候,如果消費者業務邏輯出現程式異常,這個時候我們如何處理?

使用重試機制,rabbitmq預設開啟重試機制。

實現原理:

@rabbithandler註解 底層使用aop攔截,如果程式(消費者)沒有丟擲異常,自動提交事務

如果aop使用異常通知攔截獲取到異常後,自動實現補償機制,訊息快取在rabbitmq伺服器端

注意:缺省會一直重試到消費者不拋異常為止,這樣顯然不好。我們需要修改重試機制策略,如間隔3s重試一次)

配置:spring:

rabbitmq:

# 連線位址

host: 127.0.0.1

# 埠號

port: 5672

# 賬號

username: guest

# 密碼

password: guest

# 位址(類似於資料庫的概念)

virtual-host: /admin_vhost

# 消費者監聽相關配置

listener:

******:

retry:

# 開啟消費者(程式出現異常)重試機制,預設開啟並一直重試

enabled: true

# 最大重試次數

max-attempts: 5

# 重試間隔時間(毫秒)

initial-interval: 3000

2. 如何合理選擇重試機制?

情況1: 消費者獲取到訊息後,呼叫第三方介面,但介面暫時無法訪問,是否需要重試? 需要重試,可能是因為網路原因短暫不能訪問

情況2: 消費者獲取到訊息後,丟擲資料轉換異常,是否需要重試? 不需要重試,因為屬於程式bug需要重新發布版本

總結:對於情況2,如果消費者**丟擲異常是需要發布新版本才能解決的問題,那麼不需要重試,重試也無濟於事。應該採用日誌記錄+定時任務job進行健康檢查+人工進行補償

3. 呼叫第三方介面自動實現補償機制

我們知道了,rabbitmq在消費者消費發生異常時,會自動進行補償機制,所以我們(消費者)在呼叫第三方介面時,可以根據返回結果判斷是否成功:

成功:正常消費

失敗:手動拋處乙個異常,這時rabbitmq自動給我們做重試 (補償)。

4. 如何解決消費者冪等性問題,防止重複消費 (mq重試機制需要注意的問題)

產生原因:網路延遲傳輸中,消費者出現異常或者消費者延遲消費,會造成進行mq重試補償,在重試過程中,可能會造成重複消費。

面試題:mq中消費者如何保證冪等性問題,不被重複消費?

偽**:

生產者核心**:

請求頭設定訊息id(messageid)

@component

public class fanoutproducer

}消費者核心**:

@rabbitlistener(queues = "fanout_email_queue")

public void process(message message) throws exception

system.out.println("郵件消費者獲取生產者訊息" + "messageid:" + messageid + ",訊息內容:" + msg);

jsonobject jsonobject = jsonobject.parseobject(msg);

// 獲取email引數

string email = jsonobject.getstring("email");

// 請求位址

system.out.println("執行結束....");

//① 執行到這裡已經消費成功,我們可以修改messageid的狀態,並存入日誌表(可以存到redis中,key為訊息id、value為狀態)

}5. springboot整合rabbitmq應答模式(ack)

1.修改配置******下新增 acknowledge-mode: manual:

spring:

rabbitmq:

# 連線位址

host: 127.0.0.1

# 埠號

port: 5672

# 賬號

username: guest

# 密碼

password: guest

# 位址(類似於資料庫的概念)

virtual-host: /admin_vhost

# 消費者監聽相關配置

listener:

******:

retry:

# 開啟消費者(程式出現異常)重試機制,預設開啟並一直重試

enabled: true

# 最大重試次數

max-attempts: 5

# 重試間隔時間(毫秒)

initial-interval: 3000

# 開啟手動ack

acknowledge-mode: manual

2.消費者增加**:

long deliverytag = (long) headers.get(amqpheaders.delivery_tag); 手動ack

channel.basicack(deliverytag, false);手動簽收

//郵件佇列

@component

public class fanouteamilconsumer

}

MQ的冪等性問題

冪等性問題 1 生產者已把訊息傳送到mq,在mq給生產者返回ack的時候網路中斷,故生產者未收到確定資訊,生產者認為訊息未傳送成功,但實際情況是,mq已成功接收到了訊息,在網路重連後,生產者會重新傳送剛才的訊息,造成mq接收了重複的訊息 2 消費者在消費mq中的訊息時,mq已把訊息傳送給消費者,消費...

RocketMQ解決冪等性問題

一.造成重複消費的原因 在於回饋機制。正常情況下,消費者在消費訊息時候,消費完畢後,會傳送乙個ack確認資訊給訊息佇列 broker 訊息佇列 broker 就知道該訊息被消費了,就會將該訊息從訊息佇列中刪除。不同的訊息佇列傳送的確認資訊形式不同,例如rabbitmq是傳送乙個ack確認訊息,roc...

冪等性問題和解決方法

在實際的開發專案中,乙個對外暴露的介面往往會面臨很多次請求。這就需要考慮到乙個冪等性問題。冪等性的概念是 任意多次執行所產生的影響均與一次執行的影響相同,即無論你請求了多少次,對資料庫的影響都只能有一次,不能重複處理。所以,按照上面的理解,每次執行的結果都會發生變化,就是非冪等的。如下面三條sql,...