在專案中
1.使用者確認乙個訂單,若30分鐘之類沒有支付,則需要取消訂單,若用定時任務去掃瞄訂單表,第一,定時任務時間如何定義,有存在漏掃的風險,第二,訂單表資料龐大,掃瞄表非常消耗效能,這時候該功能可以引入rabbitmq延遲佇列來做
2.某條活動通知在指定的一天推送給使用者,可以用延遲佇列
延遲佇列即傳送一條訊息給目標佇列,並非讓目標佇列立即接受到訊息,而是讓訊息等待一段延遲時間才到達目標佇列讓它消費
rabbitmq本身沒有直接支援延遲佇列功能,我們可以利用它的dxl特性來擴充套件
rabbitmq的queue可配置x-dead-letter-exchange和x-dead-letter-routing-key兩個引數,意為如果佇列中出現了dead letter,則按照這兩個引數重新路由**到指定佇列
x-dead-letter-exchange : 出現dead letter之後,將dead letter重新傳送到exchange
x-dead-letter-routing-key : 出現dead letter之後,將dead letter重新按照指定的routing-key傳送
佇列出現dead letter情況有:
由此可知,設定ttl規則之後,當訊息在乙個佇列中變成死信之後,利用dlx特性,它能重新被**到另一exchange或routing-key,從而被重新消費
在之前的文章中,我們說過,rabbitmq的訊息生成器是不直接將訊息傳送到佇列,而是傳送到交換器,交換器可以**到單個或多個佇列,由此,我們需要兩個佇列、兩個交換機
queue1: 死信佇列,將訊息傳送到死信exchange,繫結此佇列,讓訊息進入該佇列成為死信,設定ttl過期時間,該佇列沒有消費者,等待時間過期進入dead letter,當queue1佇列有死信產生時,會**到交換器x-dead-letter-exchange,以路由鍵x-dead-letter-rouuting-key**到指定queue
queue2: **佇列,也是訊息最終消費的目標佇列,此佇列需要從死信佇列接收訊息,所以需要繫結死信**到的交換器x-dead-letter-exchange
fanout sender**整合, messagepostprocessor訊息處理器是個functional介面
/**
* 延遲傳送訊息佇列,客戶端發目標佇列傳送一條延遲訊息 :
* 此時,將訊息傳送到dxl死信佇列,而非直接傳送到queuename佇列,並設定延遲時間 times 秒
* * 死信佇列沒有消費者,它用來儲存超時的訊息,並**到另一佇列,**佇列等待訊息延遲之後接收到訊息,
* 已過了times 秒,處理業務邏輯
** @param payload 訊息體
*/public void senddelaymessage(delaymessagepayload payload) , correlation);
log.info("fanout send delay message success exchange:{}, message:{}, correlationid:{}",payload.getdeadexchange(),
jsonutils.obj2json(payload), correlation.getid());
}
訊息體:
@allargsconstructor
@noargsconstructor
@data
public class delaymessagepayload implements serializable
客戶端:
delaymessagepayload payload = new delaymessagepayload(rabbitconstant.fanoutexchange.seckill_dead_letter,
rabbitconstant.fanoutqueue.seckill_forward, rabbitconstant.fanoutexchange.seckill_dead_letter,
rabbitconstant.fanoutqueue.seckill_dead_letter,
"123456789",10 * 1000);
fanoutsender.senddelaymessage(payload);
初始化佇列和交換機:
@configuration
public class seckilltimeoutpayconfig
@bean
public fanoutexchange initdeadexchange()
@bean
public binding binddead()
/*** 初始化**佇列
*/@bean
public queue initforwardqueue()
@bean
public binding bindforwardexchange()
}
消費監聽:
@rabbitlistener(queues = rabbitconstant.fanoutqueue.seckill_forward)
@component
@slf4j
public class seckilltimeoutpayreceive ", jsonutils.obj2json(payload));
}catch (exception e)
}
注意: 訊息佇列是先進先出的棧列結構,也就意味著,如果先進去的延遲時間比後進去的延遲時間要大,那麼會一直阻塞等待先進去的消費,後面的就消費不到了,需要將延遲時間一致的業務放入同一佇列即可 rabbitmq實現延遲佇列
延遲佇列應用場景 使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單。使用者註冊成功之後,需要過一段時間比如一周後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段...
RabbitMQ 延遲佇列實現
1 延遲佇列,可以通過rabbitmq自帶機制實現 ttl 死信佇列 通過設定訊息或者佇列的ttl,過期後進行訊息的投遞,從而達到delay的效果 但存在問題 1 設定佇列ttl 同乙個佇列的所有訊息從入佇列到ttl的時間,過期後會投遞到相應死信交換機。這樣如果訊息的過期時間不盡相同,會建立n個不同...
基於PHP使用rabbitmq實現訊息佇列
1.從github上面獲取amqp基於php的實現擴充套件 2.建立生產者 send.php 1 require dir protected vendor autoload.php 23 usephpamqplib connection amqpstreamconnection 4use phpam...