假設乙個場景,由於我們的消費端突然全部不可用了,導致 rabbitmq 伺服器上有上萬條未處理的訊息,這時候如果沒做任何現在,隨便開啟乙個消費端客戶端,就會導致巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多的資料,就會導致消費端變得巨卡,有可能直接崩潰不可用了。所以在實際生產中,限流保護是很重要的。
rabbitmq 提供了一種 qos (服務質量保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息(通過基於 consume 或者 channel 設定 qos 的值)未被確認前,不進行消費新的訊息。關鍵**就是在宣告消費者**裡面的
void basicqos(unit prefetchsize , ushort prefetchcount, bool global )
複製**
prefetchsize:0
prefetchcount:會告訴 rabbitmq 不要同時給乙個消費者推送多於 n 個訊息,即一旦有 n 個訊息還沒有 ack,則該 consumer 將 block 掉,直到有訊息 ack
global:true、false 是否將上面設定應用於 channel,簡單點說,就是上面限制是 channel 級別的還是 consumer 級別
備註:prefetchsize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchcount 在 no_ask=false 的情況下才生效,即在自動應答的情況下這兩個值是不生效的。
**演示:
複製**
生產端**基本沒變化,改了 exchange 和 routingkey 而已
public class procuder ", msg + i);
channel.basicpublish(consumer.exchange_name, consumer.routing_key, true, null, (msg + i).getbytes());
} }}
複製**
消費端**需要修改一下autoack 設定為 false **
增加 ** channel.basicqos(0, 1, false);
完整的消費端**如下
/**
* 使用自定義消費者
*/public class consumer
}複製**
自定義消費者
public class myconsumer extends defaultconsumer
@override
public void handledelivery(string consumertag, //消費者標籤
envelope envelope,
amqp.basicproperties properties,
byte body) throws ioexception
}複製**
然後啟動消費端,上管控臺檢視 test_qos_exchange 和 test_qos_queue 是否生成了
確認 test_qos_exchange 上繫結了 test_qos_queue
啟動生產端傳送 5 條訊息
發現消費端只列印了一條訊息
從管控台上也看到總共 5 條訊息,有 4 條等待著,一條消費了但是沒有 ack 回去
修改自定義消費者裡面的**,如下所示
public class myconsumer extends defaultconsumer
@override
public void handledelivery(string consumertag, //消費者標籤
envelope envelope,
amqp.basicproperties properties,
byte body) throws ioexception
}複製**
重啟消費端,看到消費端就按照一條一條消費,並且 ack 回去了
如上所示就是簡單的rabbitmq消費端的限流策略
消費端限流
什麼是消費端的限流 假設乙個場景,首先我們rabbitmq伺服器上面有上萬條沒有處理的訊息,我們隨便開啟乙個消費者客戶端,會出現下面情況 巨量訊息瞬間全部推送過來,但是我們當個客戶端沒有辦法進行處理這麼多的資料,可能會造成伺服器宕機。rabbitmq提供一種qos 服務質量保證 功能,即在非自動確認...
大廠如何用RabbitMQ做消費端限流
假設rabbitmq伺服器有上萬條未處理訊息,隨便開啟乙個消費端,會造成巨量訊息瞬間全部推送過來,然而我們單個客戶端無法同時處理這麼多資料。還比如說單個pro一分鐘產生了幾百條資料,但是單個con一分鐘可能只能處理60條,這時pro con不平衡。通常pro沒辦法做限制,所以con就需要做一些限流措...
消費端ACK和消費端限流
rabbitmq提供了一種qos 服務質量保證 功能,即在非自動確認訊息的前提下,如果一定數目的訊息 通過consumer或者channel設定qos的值 未被確認前,不進行消費新的訊息.自動簽收要設定成false,建議實際工作中也設定成false void basicqos int prefetc...