dlx,全稱為 dead-letter-exchange,可以稱之為死信交換器。當訊息在乙個佇列中變成死信(dead message)之後,它能被傳送到另乙個交換器中,這個交換器就是dlx,繫結dlx的佇列就稱之為死信佇列。
dlx 也是乙個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。當這個佇列中存在死信時,rabbitmq就會自動地將這個訊息重新發布到設定的dlx上去,進而被路由到另乙個佇列,即死信佇列。然後可以監聽這個死信佇列中的訊息進行相應的處理。
可以通過為佇列設定 x-dead-letter-exchange 引數設定 dlx,也可以通過設定
x-dead-letter-routing-key 引數為這個dlx指定路由鍵,如果沒有特殊指定,則使用原佇列的路由鍵。
4.1 測試過程
整個過程如下圖:
4.2 生產者**
<?phprequire __dir__ . '/../../../../vendor/autoload.php';
usephpamqplib\wire\amqptable;
usephpamqplib\message\amqpmessage;
usephpamqplib\exchange\amqpexchangetype;
usephpamqplib\connection\amqpstreamconnection;/**
* 死信佇列測試
* 1、建立兩個交換器 exchange.normal 和 exchange.dlx, 分別繫結兩個佇列 queue.normal 和 queue.dlx
* 2、把 queue.normal 佇列裡面的訊息配置過期時間,然後通過 x-dead-letter-exchange 指定死信交換器為 exchange.dlx
* 3、傳送訊息到 queue.normal 中,訊息過期之後流入 exchange.dlx,然後路由到 queue.dlx 佇列中,進行消費 */
//todo 更改配置
$connection = new amqpstreamconnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel();
$channel->exchange_declare('exchange.dlx', amqpexchangetype::direct, false, true
);$channel->exchange_declare('exchange.normal', amqpexchangetype::fanout, false, true
);//
設定 queue.normal 佇列中的訊息10s之後過期
$args = new
amqptable();
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.dlx');
$args->set('x-dead-letter-routing-key', 'routingkey');
$channel->queue_declare('queue.normal', false, true, false, false, false, $args
);$channel->queue_declare('queue.dlx', false, true, false, false
);$channel->queue_bind('queue.normal', 'exchange.normal');
$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');
$message = new amqpmessage('hello dlx message');
$channel->basic_publish($message, 'exchange.normal', 'rk');
$channel->close();
$connection->close();
執行生成者**之後,queue.normal 佇列會有一條訊息,如下圖:
10秒之後,訊息會過期,然後被進入 exchange.dlx, 進而路由到 queue.dlx 佇列中:
4.3、消費者**
<?phprequire __dir__ . '/../../../../vendor/autoload.php';
usephpamqplib\message\amqpmessage;
usephpamqplib\exchange\amqpexchangetype;
usephpamqplib\connection\amqpstreamconnection;
//todo 更改配置
$connection = new amqpstreamconnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel();
$channel->exchange_declare('exchange.dlx', amqpexchangetype::direct, false, true
);$channel->queue_declare('queue.dlx', false, true, false, false
);$channel->queue_bind('queue.dlx', 'exchange.dlx', 'routingkey');
function process_message($message
)$channel->basic_consume('queue.dlx', 'consumer_tag', false, false, false, false, 'process_message');
function shutdown($channel, $connection
)register_shutdown_function('shutdown', $channel, $connection
);while ($channel ->is_consuming())
執行消費者**之後,消費會從 queue.dlx 中消費掉:
RabbitMQ 實戰指南 一 延遲佇列
延遲佇列中儲存延遲訊息,延遲訊息是指當訊息被傳送到佇列中不會立即消費,而是等待一段時間後再消費該訊息。延遲佇列很多應用場景,乙個典型的應用場景是訂單未支付超時取消,使用者下單之後30分鐘內未支付成功,則把訂單取消。rabbitmq 本身沒有直接支援延遲佇列的功能,但是可以通過過期時間ttl和死信佇列...
RabbitMQ 實戰指南 一 過期時間TTL
目前有兩種方式可以設定訊息的ttl 如果兩種方法一起使用,則訊息的ttl已較小的數值為準。1.1 通過設定佇列屬性來控制訊息的ttl 在宣告佇列的時候可以通過 x message ttl 屬性來控制訊息的ttl,這個引數的單位是毫秒。如果不設定 ttl.則表示此訊息不會過期 如果將 ttl 設定為 ...
《RabbitMQ實戰指南》整理(七)網路分割槽
當乙個集 生網路分割槽時,集群會分為兩個部分或者更多,它們各自為政,互相都認為對方分區內的節點已經掛了,包括佇列 交換器及繫結等元資料的建立和銷毀都處於自身分區內,與其他分割槽無關。分割槽的引入是為了配合rabbitmq的資料一致性複製原理。一般情況下,網路分割槽都是由於單個節點的故障引起的,且通常...