1. 在這裡不提如何整合rabbit mq到spring。
2. 實現功能的配置都在消費者端:
3. 下面是步驟和說明
(1)在消費者端的mq配置檔案上新增,配置 關鍵**為 acknowledeg = "manual"
,意為表示該消費者的ack方式為手動(此時的queue已經和生產者的exchange通過某個routekey繫結了)
[html]
view plain
copy
<
rabbit:listener-container
connection-factory
="connectionfactory"
acknowledge
="manual"
>
<
rabbit:listener
queues
="queue_***"
ref=
"mqconsumer"
/>
<
rabbit:listener
queues
="queue_***"
ref=
"mqconsumer2"
/>
rabbit:listener-container
>
(2)新建乙個類 mqconsumer (
即對應的***
),並實現介面
channelawaremessagelistener
,實現onmessage
方法,不需要指定方法。
因為下方圖所示,springamqp中已經實現了乙個功能,如果該***已經實現了下面2個介面,則直接呼叫onmessage方法
(3)關鍵點在實現了channelawaremessagelistener的onmessage方法後,會有2個引數。
乙個是message(訊息實體),乙個是channel就是當前的通道
很多地方都沒有說清楚怎麼去手動ack,其實手動ack就是在當前channel裡面呼叫basicask的方法,並傳入當前訊息的tagid就可以了。
basicack()方法解析:
第乙個引數 deliverytag:就是接受的訊息的deliverytag,可以通過msg.getmessageproperties().getdeliverytag()獲得
第二個引數 multiple:如果為true,確認之前接受到的訊息;如果為false,只確認當前訊息。如果為true就表示連續取得多條訊息才發會確認,和計算機網路的中tcp協議接受分組的累積確認十分相似,能夠提高效率
同樣的,如果要nack或者拒絕訊息(reject)的時候,也是呼叫channel裡面的basic***方法就可以了(要指定tagid)。
注意如果拋異常或nack(並且requeue為true),訊息會重新入佇列,並且會造成消費者不斷從佇列中讀取同一條訊息的假象。
//訊息的標識,false只確認當前乙個訊息收到,true確認所有consumer獲得的訊息
channel.basicack(message.getmessageproperties().getdeliverytag(), false);
//ack返回false,並重新回到佇列,api裡面解釋得很清楚
channel.basicnack(message.getmessageproperties().getdeliverytag(), false, true);
//拒絕訊息
channel.basicreject(message.getmessageproperties().getdeliverytag(), true);
9 ACK消費者訊息確認機制
官方文件 部落格rabbitmq版本 3.8.3 amqp client版本 5.7.1 由於在rabbitmq中,傳送出去的訊息不能完全保證能夠被消費者接收到,因此需要一種消費者訊息確認機制,來為訊息從rabbitmq節點到消費者的可靠傳遞提供支援。當消費者向rabbitmq註冊之後,rabbit...
rabbitMQ 消費者ack機制與拉取模式
public static void main string args throws exception以下三種根據實際情況使用 long deliverytag envelope.getdeliverytag 訊息序列號 boolean multiple false 是否多條訊息 boolean ...
生產消費者
producer consumer model include include define buffer size 100 緩衝區數量 define max seq 200 define n consumer 10 消費者數量 define n producer 3 生產者數量 define t ...