以前一直有使用celery的優先順序機制(基於redis的任務佇列),一直很好奇它的實現機制,在查閱了部分資料後,決定寫這篇文章,作為總結。
使用sorted set 做優先順序佇列最大的優點是直觀明了。
zadd key score member [[score member] [score member] ...]
score 作為優先順序,member 作為相應的任務
在sorted set 中,score 小的,位於優先順序佇列的頭部,即優先順序較高
由於score 就是menber的優先順序,因此非常直觀
可以使用
multi
zrange key
00 withscores
zremrangebyrank task_list 0
0exec
來獲取任務佇列中優先順序最高的元素
zrange 用於獲取任務,zremrangebyrank 用於從訊息佇列中移除
注意:由於sorted set本身是乙個set,因此訊息佇列中的訊息不能重複,否則新加入的訊息會覆蓋以前加入的訊息
注意:對於score 相同的訊息,sorted set 會按照字典序進行排序
應該一下就能想到,list 是作為訊息佇列的最理想的選擇,但這裡使用list 實現帶優先順序的訊息佇列也可以有好幾種不同的實現方式。
2.1 準備
首先,如果我們假定訊息佇列中的訊息,從訊息佇列的右側推入(rpush),從左側取出(lpop)
那麼單個list 很容易構造成乙個fifo 佇列。但是如果優先順序只有兩級,高和低,那麼我們可以把高優先順序的訊息,使用lpush 推入佇列左側,把低優先順序的訊息,使用rpush推入到佇列右側, 這樣單個list就可以實現2級的帶優先順序的訊息佇列。
2.2 使用blpop
redis 提供了列表的阻塞式(blocking)彈出原語。
blpop key [key ...] timeout
當給定多個 key 引數時,按引數 key 的先後順序依次檢查各個列表,彈出第乙個非空列表的頭元素。
這樣我們可以建立三個佇列,high,normal, low ,分別代表高優先順序,普通優先順序,低優先順序
blpop high normal low
2.3 基於多個key 的lpop
有時候我們並不想要阻塞式的原語,那麼在業務層,我們可以在多個佇列中遍歷,查詢來獲取訊息
queue_list = ["high", "normal", "low"]
defget_msg
():for queue in queue_list:
msg = redis_db.lpop(queue)
if msg is
notnone:
return msg
return
none
rq – 非同步任務系統,從任務佇列中獲取任務的**如下。它封裝了redis的lpop 既支援阻塞式,也支援非阻塞式的獲取優先順序最高的任務。
@classmethod
deflpop
(cls, queue_keys, timeout, connection=none):
connection = resolve_connection(connection)
if timeout is
notnone: # blocking variant
if timeout == 0:
raise valueerror('rq does not support indefinite timeouts. please pick a timeout value > 0.')
result = connection.blpop(queue_keys, timeout)
if result is
none:
raise dequeuetimeout(timeout, queue_keys)
queue_key, job_id = result
return queue_key, job_id
else: # non-blocking variant
for queue_key in queue_keys:
blob = connection.lpop(queue_key)
if blob is
notnone:
return queue_key, blob
return
none
2.4 擴充套件
如果我們需要10個優先順序的訊息佇列,可以想到我們需要至少5個佇列(參考2.1)
這時候我們的訊息佇列的命名可能就需要採取某種規則
比如,原打算命名的訊息佇列的名稱為 msg_queue
那麼這5個訊息佇列就可以被命名為
msg_queue-0
msg_queue-1
msg_queue-2
msg_queue-3
msg_queue-4
如果再結合
keys pattern
我們就可以得到對任意多個優先順序支援的訊息佇列
# priority 1 ~ 10
# push message into list
defpush_message
(queue, priority, message):
num = (priority - 1) / 2
target_queue = queue + "-" + str(num)
# direct
if priority % 2 == 1:
redis_db.lpush(target_queue, message)
else:
redis_db.rpush(target_queue, message)
# fetch a message
deffetch_message
(queue):
queue_list = redis_db.keys(queue + "-?")
queue_list = sorted(queue_list)
for queue in queue_list:
msg = redis_db.lpop(queue)
if msg is
notnone:
return msg
return
none
注意:採用這種做法,同一優先順序的訊息,並不滿足fifo Redis實現優先順序佇列
title redis實現優先順序佇列 tags 基於目前系統中存在部分非同步需求,比如匯入或者新開客戶車輛匹配vin碼等 redis中使用列表作為佇列 最關鍵提供了阻塞版本的指令blpop 新建三個佇列對應高中低優先順序 比如f6car high f6car mid f6 car low 再新建d...
利用優先順序佇列實現堆疊
因為優先順序佇列是一種用來維護由一組元素構成的集合s的資料結構,執行的操作包含 對於最大優先順序佇列 insert s,x 把元素x插入s,仍然保持最大優先順序佇列 maximum s 取得最大關鍵字的值,也就是優先順序最高的 extract max s 去的最大關鍵字的值並刪除,剔除優先順序最高的...
使用Redis實現優先順序佇列
優先順序佇列是一種如先進先出佇列和堆疊資料結構的抽象資料型別。所不同的是每乙個元素關聯乙個 優先順序 優先順序高的元素比優先順序低的元素優先得到處理。本文講解如何基於redis的sorted set資料型別實現優先順序佇列。sorted set中元素關聯乙個score,可以按score有序查詢元素。...