前言
由於公司提供的佇列實在太過於蛋疼而且還限制不能使用其他佇列,但為了保證資料安全性需要乙個可以有ack功能的佇列。
原生的redis中通過l/r push/pop方式來實現佇列的功能,這個當然是沒辦法滿足需求的(沒有ack功能),所以需要自己對redis的list(佇列)做個小小的調整。
大體思路為在pop時將pop出的資料放到備份的地方,當有ack請求(確認訊息被消耗)後將備份的資訊刪除掉;每次在pop前需要檢查備份佇列中有沒有過期的資料沒有ack的,如果有則push到list中後再從list中pop出來。
以下指令碼使用lua實現,只www.cppcns.com需要在執行前載入到redis中即可。
訊息本身需要包含id屬性
push沒什麼問題,原生即可(此處以lpush為例)
pop時指令碼
local not_empty = function(x)
return (type(x) == "table") and (not x.err) and (#x ~= 0)
endlocal qname = a程式設計客棧r**[1] --佇列名稱
local currenttime = ar**[2] --當前時間,這個需要從外部傳入,程式設計客棧不能使用redis自身時間,如果使用自身時間可能導致redis本身的backup在重放請求時出現不一致性
local considerasfailmaxtimespan = ar**[3] --超時時間設定,當訊息超過一定時間還沒有ack則認為此訊息需要再次入隊
local zsetname= qname ..'backup'
local hashname= qname ..'context'
local tmp = redis.call('zrangebyscore',zsetname , '-inf', tonumber(currenttime) - tonumber(considerasfailmaxtimespan), 'limit', 0, 1)
if (not_empty(tmp)) then
redis.call('zrem', zsetname, tmp[1]) --此處拿出的為訊息的唯一id
redis.call('lpush', qname, redis.call('hget', hashname, tmp[1]))
endtmp = redis.call('rpop', qname)
if (tmp) then
local msg = cjson.decode(tmp)
local id = msg['id']
redis.call('zadd', zsetname, tonumber(currenttime), id)
redis.call('hset',hashname , id, tmp)
endreturn tmp
ack時候比較簡單,只需要將指定id從set和hash中刪除即可
local key = ar**[1]
local qname=ar**[2]
redis.call('zrem', qname..'backup', key)
redis.call('hdel', qname..'context',ookehixytk key)
在程式中使用前需要顯示load這兩個指令碼,後面直接呼叫這兩個指令碼的sha值即可執行。
總結本文標題: 詳解redis是如何實現佇列訊息的ack
本文位址: /shujuku/redis/183979.html
使用Redis做過非同步佇列嗎,是如何實現的?
redis設計用來做快取的,但是由於它自身的某種特性使得它可以用來做訊息佇列。它有幾個阻塞式的api可以使用,正是這些阻塞式的api讓其有能力做訊息佇列 另外,做訊息佇列的其他特性例如fifo 先入先出 也很容易實現,只需要乙個list物件從頭取資料,從尾部塞資料即可 redis能做訊息佇列還得益於...
redis佇列實現
redis佇列,使用了list列表資料結構,lpush生成,rpop消費模式。yiic queue worker qname 是佇列的名稱,例如 current,position等等,這只是消費者,即佇列處理程式。這是乙個典行的yii的命令列程式,檔案在 path to protected comm...
詳解Python是如何實現issubclass的
使用python內建的issubclass方法很方便的檢測乙個類是否是另乙個類程式設計客棧的子類。這個是issubclass的文件 issubclass class,classinfo return true if class is a subclass direct,indirect or vir...