順序訊息的重試
對於順序訊息,當消費者消費訊息失敗後,訊息佇列 rocketmq 會自動不斷進行訊息重試(每次間隔時間為 1 秒),這時,應用會出現訊息消費被阻塞的情況。因此,在使用順序訊息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
defaultmqpushconsumer consumer =
newdefaultmqpushconsumer
("consumer_grp_04_01");
consumer.
setnamesrvaddr
("node1:9876");
consumer.
setconsumemessagebatchmaxsize(1
);consumer.
setconsumethreadmin(1
);consumer.
setconsumethreadmax(1
);// 訊息訂閱
consumer.
subscribe
("tp_demo_04"
,"*");
// 併發消費
// consumer.setmessagelistener(new messagelistenerconcurrently()
// });
// 順序消費
consumer.
setmessagelistener
(new
messagelistenerorderly()
return null;}}
);consumer.
start()
;
對於無序訊息(普通、定時、延時、事務訊息),當消費者消費訊息失敗時,您可以通過設定返回狀態達到訊息重試的結果。
無序訊息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息
重試次數
訊息佇列 rocketmq 預設允許每條訊息最多重試 16 次,每次重試的間隔時間如下
如果訊息重試 16 次後仍然失敗,訊息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條訊息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鐘之內進行 16 次重試,超過這個時間範圍訊息將不再重試投遞。
注意: 一條訊息無論重試多少次,這些重試訊息的 message id 不會改變
配置方式
消費失敗後,重試配置方式
集群消費方式下,訊息消費失敗後期望訊息重試,需要在訊息***介面的實現中明確進行配置
(三種方式任選一種)
返回 consumeconcurrentlystatus.reconsume_later; (推薦)
返回 null
丟擲異常
public
class
myconcurrentlymessagelistener
implements
messagelistenerconcurrently
}
消費失敗後,不重試配置方式
集群消費方式下,訊息失敗後期望訊息不重試,需要捕獲消費邏輯中可能丟擲的異常,最終返回consumeconcurrentlystatus.consume_success,此後這條訊息將不會再重試。
public
class
myconcurrentlymessagelistener
implements
messagelistenerconcurrently
catch
(throwable e)
//訊息處理正常,直接返回 consumeconcurrentlystatus.consume_success
return consumeconcurrentlystatus.consume_success;
}}
最大重試次數小於等於 16 次,則重試時間間隔同上表描述。
最大重試次數大於 16 次,超過 16 次的重試時間間隔均為每次 2 小時
defaultmqpushconsumer consumer =
newdefaultmqpushconsumer
("consumer_grp_04_01");
// 設定重新消費的次數
// 共16個級別,大於16的一律按照2小時重試
consumer.
setmaxreconsumetimes(20
);
注意
訊息最大重試次數的設定對相同 group id 下的所有 consumer 例項有效。
如果只對相同 group id 下兩個 consumer 例項中的其中乙個設定了
maxreconsumetimes,那麼該配置對兩個 consumer 例項均生效。
配置採用覆蓋的方式生效,即最後啟動的 consumer 例項會覆蓋之前的啟動例項的配置
獲取訊息重試次數
消費者收到訊息後,可按照如下方式獲取訊息的重試次數
public
class
myconcurrentlymessagelistener
implements
messagelistenerconcurrently
doconsumemessage
(msgs)
;return consumeconcurrentlystatus.consume_success;
}}
RocketMQ訊息型別
普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...
RocketMQ 事務訊息
一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...
rocketMq訊息查詢
最近有人問我知道rocketmq是怎麼查詢訊息的,我發現我貌似回答不上來,所以抽空就把這塊內容補充一下,主要是講清楚根據key查詢訊息和根據msgid查詢訊息兩塊內容。看下引數列表中我們可以看到 k指出了核心key的引數,指定了根據key查詢訊息的方法,這個命令返回的是msgid,據說還有一些坑,可...