基於redis構建訊息佇列

2021-08-21 15:21:40 字數 2621 閱讀 5761

一般來說,訊息佇列有兩種場景:一種是發布者訂閱者模式;一種是生產者消費者模式。利用redis這兩種場景的訊息佇列都能夠實現。定義:

1、redis作為訊息中介軟體:

1)producer/consumermode:

該方式是借助redis的list結構實現的。producer呼叫redis的lpush往特定key裡塞入訊息,consumer呼叫brpop(阻塞方法)去不斷監聽該key。

2)pubsub mode:

redis 從 2.0.0 版本開始支援 pub/sub 指令。實現思想很簡單,publisher呼叫redis的publish方法往特定的channel傳送訊息,subscriber在初始化的時候要subscribe到該channel,一旦有訊息就會立即接收,否則會阻塞。

注:這種訂閱是非持久化的。但是可以通過把訂閱的訊息額外的儲存到redis中來實現持久化。

2、為什麼採用單獨的訊息佇列中介軟體?

3、實現阻塞佇列原理:

1、使用redis怎麼做訊息佇列

首先redis它的設計是用來做快取的,但是由於它自身的某種特性使得他可以用來做訊息佇列。它有幾個阻塞式的api(brpop、sub,他們都是阻塞版的)可以使用,正是這些阻塞式的api讓他有做訊息佇列的能力。

其次,訊息佇列的其他特性例如fifo也很容易實現,只需要乙個list物件從頭取資料,從尾部塞資料即可實現。

2、簡單fifo佇列:

1)一些基礎redis基礎知識的說明

redis> blpop tasklist 0

「im task 01」

這個例子使用blpop命令會阻塞方式地從tasklist列表中取頭乙個資料,最後乙個引數就是等待超時的時間。如果設定為0則表示無限等 待。另外redis存放的資料都只能是string型別,所以在任務傳遞的時候只能是傳遞字串。我們只需要簡單的將負責資料序列化成json格式的字串,然後消費者那邊再轉換一下即可。

2)實現:

上例子即使乙個最簡單的消費者,我們通過乙個無限迴圈不斷地從redis的佇列中取資料。如果佇列中沒有資料則沒有超時的阻塞在那裡,有資料則取出往下執行。

一般情況取出來是個複雜的字串,我們可能需要將其格式化後作為再傳給處理函式,但是為了簡單我們的例子就是乙個普通字串。另外例子中的處理函式不做任何處理,僅僅sleep 用來模擬耗時的操作。

我們另開乙個redis的客戶端來模擬生產者,自帶的客戶端就可以。多往tasklist 佇列裡面塞上一些資料。

隨後在消費者端便會看到這些模擬出來的任務被挨個消費掉。

3、簡單優先順序的佇列:

假設一種簡單的需求,只需要高優先順序的比低優先順序的任務率先處理掉。其他任務之間的順序一概不管,這種我們只需要在在遇到高優先順序任務的時候將它塞到佇列的前頭,而不是push到最後面即可。因為我們的佇列是使用的redis的 list,所以很容易實現。遇到高優先順序的使用rpush 遇到低優先順序的使用lpush。

隨後會看到,高優先順序的總是比低優先順序的率先執行。但是這個方案的缺點是高優先順序的任務之間的執行順序是先進後出的。

4、較完善的優先順序佇列:

1)弊端:

上例只是簡單的將高優先順序的任務塞到佇列最前面,低優先順序的塞到最後面。這樣保證不了高優先順序任務之間的順序,假設當所有的任務都是高優先順序的話,那麼他們的執行順序將是相反的。這樣明顯違背了佇列的fifo原則。

不過只要稍加改進就可以完善我們的佇列。

2)跟使用rabbitmq一樣,我們設定兩個佇列,乙個高優先順序乙個低優先順序的佇列。高優先順序任務放到高佇列中,低的放在低優先佇列中。redis和rabbitmq不同的是它可以要求佇列消費者從哪個佇列裡面先讀。

上面的**,會阻塞地從』high_task_queue』, 『low_task_queue』這兩個佇列裡面取資料,如果第乙個沒有再從第二個裡面取。所以只需要將佇列消費者做這樣的改進便可以達到目的。

通過上面的測試看到,高優先順序的會被率先執行,並且高優先順序之間也是保證了fifo的原則。這種方案我們可以支援不同階段的優先順序佇列,例如高中低三個級別或者更多的級別都可以。

5、優先順序級別很多的情況

假設有個這樣的需求,優先順序不是簡單的高中低或者0-10這些固定的級別。而是類似0-99999這麼多級別。那麼我們第三種方案將不太合適了。

雖然redis有sorted set這樣的可以排序的資料型別,看是很可惜它沒有阻塞版的介面。於是我們還是只能使用list型別通過其他方式來完成目的。

有個簡單的做法我們可以只設定乙個佇列,並保證它是按照優先順序排序號的。然後通過二分查詢法查詢乙個任務合適的位置,並通過 lset 命令插入到相應的位置。

例如佇列裡面包含著寫優先順序的任務[1, 3, 6, 8, 9, 14],當有個優先順序為7的任務過來,我們通過自己的二分演算法乙個個從佇列裡面取資料出來反和目標資料比對,計算出相應的位置然後插入到指定地點即可。

因為二分查詢是比較快的,並且redis本身也都在記憶體中,理論上速度是可以保證的。但是如果說資料量確實很大的話我們也可以通過一些方式來調優。

回想我們第三種方案,把第三種方案結合起來就會很大程度上減少開銷。例如資料量十萬的佇列,它們的優先順序也是隨機0-十萬的區間。我們可以設定 10個或者100個不同的佇列,0-一萬的優先順序任務投放到1號佇列,一萬-二萬的任務投放到2號佇列。這樣將乙個佇列按不同等級拆分後它單個佇列的資料 就減少許多,這樣二分查詢匹配的效率也會高一點。但是資料所佔的資源基本是不變的,十萬資料該佔多少記憶體還是多少。只是系統裡面多了一些佇列而已。

基於redis的延遲訊息佇列設計

需求背景 佇列設計 目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。開發前需要考慮的問題?簡單定義乙個訊息資料結構 private string topic topic private string id 自動生成 全域性惟一 snowflak...

基於redis的延遲訊息佇列設計

需求背景 佇列設計 目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。開發前需要考慮的問題?簡單定義乙個訊息資料結構 private string topic topic private string id 自動生成 全域性惟一 snowflak...

基於redis的延遲訊息佇列設計

需求背景 佇列設計 目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的mq中介軟體。開發前需要考慮的問題?簡單定義乙個訊息資料結構 private string topic topic private string id 自動生成 全域性惟一 snowflak...