官方文件由於在部落格
rabbitmq
版本:3.8.3
amqp-client
版本:5.7.1
rabbitmq
中,傳送出去的訊息不能完全保證能夠被消費者接收到,因此需要一種消費者訊息確認機制,來為訊息從rabbitmq
節點到消費者的可靠傳遞提供支援。
當消費者向rabbitmq
註冊之後,rabbitmq
將通過basic.deliver
方法投遞訊息給消費者。在每一次投遞的訊息體上都會攜帶乙個delivery tag
,這個值在每隔通道上是唯一的,用來標識本次投遞。
delivery tag
值是單調遞增的正整數,64
位長度,值從1
開始,每傳送一次訊息值遞增1
,最大值為9223372036854775807
。
消費者在收到訊息後,向rabbitmq
傳送應答訊息時,帶上這個delivery tag
,告訴rabbitmq
某次訊息投遞已經成功接收。
對於消費者訊息確認機制,rabbitmq
提供了兩種確認方式:
rabbitmq
成功將訊息傳送出去(將訊息成功寫入tcp socket
)之後立即認為本次訊息投遞已經成功,不管消費者端是否成功接收到訊息並處理了本次訊息投遞。
這種訊息被傳送出去立刻被認為是成功投遞的,又被稱作fire-and-forget
。
優點:
缺點:
消費者受到訊息之後,手動呼叫basic.ack
、basic.nack
、basic.reject
方法,傳送應答訊息,當rabbitmq
受到應答訊息之後,才認為本次投遞成功。
三個函式的含義:
注意:手動確認方式的訊息投遞效率低於自動確認方式,但是能夠彌補自動確認方式的不足。
basic.reject
和basic.nack
都是用於否定確認,但是多了乙個限制:一次只能拒絕單條訊息。以上三個方法都表示訊息已經被正確投遞。但是
basic.ack
表示訊息已經被正確處理,basic.nack
、basic.reject
表示訊息沒有被正確處理,rabbitmq
中仍然需要刪除這條訊息。
例子:
// 下面的**片段演示了在消費者端如何手動確認訊息投遞
boolean autoack =
false
;channel.
basicconsume
(queuename, autoack,
"a-consumer-tag"
,new
defaultconsumer
(channel)})
;
手動確認方式也支援批量確認,這樣可以顯著減少網路流量。
批量確認是通過將multiple
引數設定為true
來實現的。這樣將會對deliverytag
以及比deliverytag
值小的訊息一同批量確認。
void
basicack
(long deliverytag,
boolean multiple)
throws ioexception;
例子:
// 下面的**片段演示了在消費者端如何手動確認訊息投遞,批量確認
boolean autoack =
false
;channel.
basicconsume
(queuename, autoack,
"a-consumer-tag"
,new
defaultconsumer
(channel)})
;
有時候消費者不能立即處理訊息,但是有其他消費者例項可以處理這個訊息。在這種情況下,可以通過basic.nack
和basic.reject
否定確認訊息。
這兩個方法是來告訴rabbitmq
,訊息我們收到了,但是沒有處理它。當rabbitmq
收到這樣的應答訊息之後,可以選擇刪除訊息也可以選擇重新投遞。這個行為由requeue
引數來控制,如果requeue
設定為true
,那麼rabbitmq
將會重新投遞這個訊息。
void
basicreject
(long deliverytag,
boolean requeue)
throws ioexception;
例子:
// 下面的例子用來說明消費者如何否定確認訊息
boolean autoack =
false
;channel.
basicconsume
(queuename, autoack,
"a-consumer-tag"
,new
defaultconsumer
(channel)})
;
使用basic.reject
只能否定確認單個訊息。但是,使用basic.nack
可以批量否定確認多條訊息。
批量否定確認也是通過將引數m
設定為true
來實現的。
void
basicnack
(long deliverytag,
boolean multiple,
boolean requeue)
throws ioexception;
例子:
// 下面的例子用來說明消費者如何批量否定確認訊息
boolean autoack =
false
;channel.
basicconsume
(queuename, autoack,
"a-consumer-tag"
,new
defaultconsumer
(channel)})
;
為了保證消費者能夠正確地處理投遞的訊息,還可以採用下面的這些措施。
只能在手動確認模式中使用。
為了避免消費者一次不能處理過多的訊息導致訊息堆積,可以通過basic.qos
設定最大的預取值。該值定義了通道上允許的最大未確認訊息的數量,一旦未確認訊息的數量達到配置值,rabbitmq
將停止在通道上傳送更多訊息,直到至少有乙個未被確認的訊息被確認。
備註:通道預取設定在basic.get(「plapi」)
中是不啟作用的,即使在手動確認模式中。
在rabbitmq
中影響吞吐量最大的引數是訊息確認模式和qos
預取值。
自動訊息確認模式或設定qos
預取值為無限雖然可以最大的提高訊息的投遞速度,但是在消費者端未及時處理的訊息的數量也將增加,從而增加消費者端記憶體的消耗,很可能使消費者端崩潰。所以以上兩種情況需要謹慎使用。
rabbitmq
官方推薦qos
預取值設定在100
到300
範圍內的值,通常能夠提供最佳的吞吐量,並且不會有使消費者奔潰的問題。
在訊息手動確認模式中,如果投遞訊息的所有通道或連線被突然關閉(包括消費者端丟失tcp
連線、消費者應用程式(程序)掛掉、通道級別的協議異常),任何已經投遞的但是沒有被消費者端確認的訊息會自動重新排隊,等待重新被投遞。
請注意,檢測消費者端的連線不可用需要一段時間才會發現,所以會有一段時間內的所有訊息被重新投遞。
因為訊息可能被重新投遞,所以必須保證消費者端的介面的冪等性。
如果客戶端對同乙個delivery tag
標記的訊息進行了多次ack
確認,這將會導致rabbitmq
通道上的乙個錯誤,例如precondition_failed - unknown delivery tag 100
。如果使用了未知的tag
,將引發相同的通道異常。
注意,必須在同乙個通道上對訊息進行確認。如果在另乙個通道上對訊息進行了確認,將會導致unknown delivery tag
的錯誤。
對於不可路由的訊息,一旦exchange
驗證訊息不會被路由到任何佇列,這條訊息將會由**來確認。
消費者手動ack
1.在這裡不提如何整合rabbit mq到spring。2.實現功能的配置都在消費者端 3.下面是步驟和說明 1 在消費者端的mq配置檔案上新增,配置 關鍵 為 acknowledeg manual 意為表示該消費者的ack方式為手動 此時的queue已經和生產者的exchange通過某個route...
RabbitMQ中的訊息確認ACK機制
我們將訊息持久化後,假如消費端出現異常,rabbitmq伺服器會將訊息快取到記憶體,當生產者傳送一直傳送訊息而消費者都沒有正常消費時訊息就會將這些訊息全部儲存在記憶體,當我們的訊息過多時,就可能導致rabbitmq伺服器記憶體洩漏,解決辦法 1.開啟ack確認機制,2.消費端設定重試機制 預設是三次...
rabbitMQ 消費者ack機制與拉取模式
public static void main string args throws exception以下三種根據實際情況使用 long deliverytag envelope.getdeliverytag 訊息序列號 boolean multiple false 是否多條訊息 boolean ...