最近專案裡有需求,在介面呼叫完畢後將一些訊息通過mq通知給另乙個服務,並且因為業務的原因,需要停留一分鐘再投遞到mq,另乙個團隊來消費,我本來想用rabbitmq(以下簡稱rmq)來實現,但經過和同事討論決定不用rmq來實現延時,rmq只充當訊息通知,延時在本地進行實現。本地採用乙個單機的延時佇列,是我另乙個同事寫的簡單元件,拿過來直接用就行了,把功能做完,順利上線,但是之後問題還是暴露了出來,mq消費者那邊的團隊反饋訊息有時候沒收到,於是我開始排查問題所在,確認了我業務**是沒問題的,但是看服務端的日誌顯示的確沒發出去,但是因為是本地的乙個延時佇列,訊息放在記憶體裡,我也沒法查佇列裡的具體訊息情況,也不敢下結論是本地延時佇列的問題,因為開發環境和測試環境都沒出現問題。只能先跟蹤一下,第二天把前面三天的日誌都拉下來看發現問題所在,許多例項都出現了訊息在第二天要麼就當天的很晚的時候才發出去的情況,排除了可能是因為虛擬機器沒有同步宿主機的時間的原因之後(因為延時佇列裡面是獲取的當前時間和訊息建立時間的差來判斷時間間隔),我這下基本確定這個是我同事寫的延時佇列的問題。
其實這個延時佇列邏輯很簡單,資料結構就是乙個陣列,入佇列就是把訊息放在可用位置上,到了數量滿足一定條件的時候就擴容,出佇列的時候全陣列掃瞄,當碰到到期的訊息的時候,將訊息取出。其實現在看來這個延時佇列其實設計得不是很優雅,如果取元素,需要經過很多次掃瞄大陣列,並且擴容的時候對記憶體的消耗也大,這裡**就不帖了,這個故事告訴我們,乙個元件要給別人用,必須要經過多方面專業的測試才行,這次的問題後來我和同事討論其實還是因為有些case沒有測試到導致的。另外就是,選用元件的時候最好還是選用已知的穩定的,因為經過檢驗的才是出故障可能性比較小的。
延時佇列在業務中經常會用到,比如網上買個東西,訂單生成了但是多少時間內沒支付就關閉訂單,定時邏輯等等。之前我在學rabbitmq的時候也實現過類似的功能。具體可以看rabbitmq延時佇列,今天來整理一下去設計乙個延時佇列需要些什麼並且有哪些方案。
單機延時佇列
來粗略看一下裡面帶的結構有啥
public class delayqueueextends abstractqueueimplements blockingqueue
從注釋中,可以得知這裡面的方法是用來獲取剩餘的時間的。並且這介面還是實現了比較器的介面的,所以不難推出,這裡其實就是通過堆排序,來找到最早過期的元素。也就是最先應該出佇列的元素。
簡單做個demo,是實現以下delayed的介面
public class delaytask implements delayed
@override
public long getdelay(timeunit unit)
@override
public int compareto(delayed o) else
}//getter setter
}
做個簡單的測試
public class delayqueuetest ;
for (int i = 0;i < delaytimes.length;i++)
while(!queue.isempty())
thread.sleep(1000);}}
}
執行結果
當前執行的任務編號為:1
時間間隔為:5001
當前執行的任務編號為:0
時間間隔為:10000
當前執行的任務編號為:2
時間間隔為:15000
那麼上面的方法有啥利弊呢,首先優點肯定是簡單,缺點也顯而易見,可靠性差,並且記憶體占用的問題也很明顯。
像我前面提到我公司的同事的做法中,迴圈去遍歷整個陣列去檢測訊息是否達到延時時間的方法其實只能適用於小服務並且呼叫量不大的情況,一旦像呼叫量大了起來,實際上輪詢整個陣列去檢測訊息是否達到延時時間是很低效的。那麼在這基礎上,可以採用時間輪的辦法,乙個時間輪代表乙個週期,乙個週期裡分為幾個時間間隔,每乙個時間間隔裡包含在這一分鐘內所有的定時任務,時間輪在結構上是乙個雙向鍊錶。
如圖所示
假設這裡乙個時間節點代表一分鐘,這裡乙個時間輪也就是週期為八分鐘,當當前時間到達時間節點2的時候,這說明1中的任務已經全部過期且處理完成,時間節點2對應的定時任務就開始處理。這樣做的優點是可以通過乙個執行緒監控多個定時任務,但是缺點也很明顯,就是時間顆粒度由節點的間隔決定,並且這些任務的時間間隔還需要用同樣的時間顆粒度。並且需要考慮,不在時間週期裡的任務如何處理。然後延時佇列的其他特性都還需要通過自己實現來補上。
**這裡先挖個坑,之後我補上。
分布式中的延時佇列
zset的排序功能,直接提供了很方便的解決辦法,只要我們把score設定為定時任務預計執行時間的時間戳,也就是當前時間+延時的時間,這樣排序後首先拿到的就是最早過期的,命令也很簡單,就是
zrangebyscore key min max
就可以獲取到max對應時間戳之前的所有任務。這種做法的優點是,許多功能redis都實現了,比如持久化,高可用性這些。但是缺點也有,那就是訊息的延時和我們輪詢讀redis的速度有關,獲取當前時間之前的定時任務,可能有任務離當前時間比較遠,並且訊息過多的情況下,redis本身會受一定影響
這個我在前面有寫過類似的文章。
rabbitmq延時佇列
阿里的開源訊息佇列,但我目前還沒做太多了解,在我補齊這個中介軟體的技能點的時候一塊兒補上。
實現mq延時佇列(訂單延時取消)
簡單實現mq延時佇列 1.rabbitmqconfiguration mq配置類 configuration slf4j public class rabbitmqconfiguration string host,value int port,value string username,value...
MQ如何實現訊息延時
很多時候,業務有 在一段時間之後,完成乙個工作任務 的需求。例如 滴滴打車訂單完成後,如果使用者一直不評價,48小時後會將自動評價為5星。一般來說怎麼實現這類 48小時後自動評價為5星 需求呢?常見方案 啟動乙個cron定時任務,每小時跑一次,將完成時間超過48小時的訂單取出,置為5星,並把評價狀態...
訊息佇列MQ
目錄 一 簡介 二 為什麼需要訊息佇列 mq 三 介紹 訊息佇列 message queuing 在電腦科學中,是一種程序間通訊或同一程序間不同執行緒的通訊方式。廣義上講訊息佇列是解決分布式系統中,各個功能模組間的資訊傳遞通訊方式。與檔案傳輸和rpc相比,訊息佇列具有更好的平台無關性,並能夠很好地支...