佇列在資料結構中是一種線性表,從一端插入資料,然後從另一端刪除資料。本文目的不是講解各種佇列演算法,而是在應用層面講述使用佇列能解決哪些場景問題。
應用場景
非同步處理:使用佇列的乙個主要原因是進行非同步處理,比如使用者註冊成功後需要傳送註冊成功郵件/新使用者積分/優惠券等等、快取過期時先返回老的資料,然後非同步更新快取、非同步寫日誌等;通過非同步處理,可以提公升主流程響應速度,而非主流程/非重要業務可以非同步集中處理,這樣還可以將任務聚合然後批量處理;因此可以使用訊息佇列/任務佇列來進行非同步處理。
系統解耦:比如使用者成功支付完成訂單後,需要通知生產配貨系統、發票系統、庫存系統、推薦系統、搜尋系統、風控系統等進行業務處理;而未來需要新增/支援哪些業務是不清楚的,而且這些業務處理不需要實時處理、不需要強一致,只需要最終一致性即可,因此可以通過訊息佇列/任務佇列進行系統解耦。
資料同步:比如想把mysql變更的資料同步到redis、或者將mysql資料同步到mongodb、或者機房間資料同步、或者主從資料同步等,此時可以考慮使用如databus、canal、otter。使用資料匯流排佇列進行資料同步的好處是可以保證資料修改的有序性。
流量削峰:系統瓶頸一般在資料庫上,比如扣減庫存、下單等;此時可以考慮使用佇列將變更請求暫時放入佇列,通過快取+佇列暫存的方式將資料庫流量削峰;還有如秒殺系統,下單服務會是該系統的瓶頸,此時會使用佇列進行排隊和限流,從而保護下單服務。通過佇列暫存或者佇列限流來削峰。
比如減庫存,可以考慮這樣設計:
直接在redis中扣減,然後記錄下扣減日誌(fifo佇列),通過worker去同步到db。
實際佇列的應用場景還是非常多的,本文列舉了筆者遇到過比較多的場景。
緩衝區佇列
通過緩衝區佇列可以實現:批量處理、非同步處理。
任務佇列
使用任務佇列將一些不需要與主線程同步執行的任務扔到任務佇列非同步處理即可;筆者用的最多的是執行緒池任務佇列(預設linkedblockingqueue)和disruptor任務佇列(ringbuffer)。如刷資料時,將任務扔到佇列非同步處理即可,處理成功後再非同步通知使用者;還有如刪除sku操作,使用者請求時直接將任務分解並扔到佇列,非同步處理,處理成功後非同步通知使用者即可;還有如查詢聚合,將多個可並行處理的任務扔到佇列然後等待最慢的乙個返回。如果使用的是記憶體任務佇列請記住可能存在系統重啟等問題造成的資料丟失。
通過任務佇列可以實現:非同步處理、任務分解/聚合處理。
注:jdk7提供了executorservice的新的實現forkjoinpool,其提供了work-stealing機制,可以更好地提公升併發效率。
在使用executors.newfixedthreadpool時,其沒有設定佇列大小(預設integer.max_value),如果有大量任務被快取到linkedblockingqueue中等待執行緒執行,會出現gc慢等問題,造成系統響應慢甚至oom。因此在使用執行緒池時候,要指定佇列大小並設定合理的rejectedexecutionhandler;要記錄請求**的引數方便定位引發問題的源頭。
訊息佇列
筆者所在公司使用的是自研的jmq;開源的有activemq、kafka、redis。使用訊息佇列儲存各業務資料,其他系統根據需要訂閱即可。常見的模式是:點對點(乙個訊息只有乙個消費者)、發布訂閱(乙個訊息可以有多個消費者);而常用的是發布訂閱模式。
比如使用者註冊成功、修改商品資料、訂單狀態變更等都應該將變更傳送到訊息佇列,從而其他系統根據需要訂閱該訊息,然後按照自己的需求進行業務邏輯開發。
在新增新功能時,訊息消費者只需要訂閱該訊息,然後開發相應的業務邏輯,訊息生產者根本不關心你怎麼使用訊息和你做什麼業務處理。
同步呼叫,新增什麼新功能都需要到使用者系統提需求。其中乙個服務出現問題了,整個服務就不可用了。
訊息佇列,使用者系統只需要發布使用者註冊成功的訊息即可,相關系統訂閱該訊息,然後執行相關的業務邏輯。相關服務出問題不影響到註冊主流程。
通過訊息佇列可以實現:非同步處理、系統解耦。
請求佇列
請求佇列是指如在web環境下對使用者請求排隊,從而進行一些特殊控制:流量控制、請求分級、請求隔離;如將請求按照功能劃分到不同的佇列,從而使得不同的佇列出現問題後相互不影響;還可以對請求分級,一些重要請求可以優先處理(發展到一定程度應將功能物理分離);還有伺服器處理能力有限,在接近伺服器瓶頸時需要考慮限流,最簡單的限流時丟棄處理不了的請求,此時可以使用佇列進行流量控制。
資料匯流排佇列
一般訊息佇列中的訊息都是業務維度的,比如業務鍵或者業務狀態等,比如哪個sku變更了,而有些訂閱者需要再查一遍來獲取最新的修改資料(比如快取同步);通過現有的訊息佇列方式的缺點是很難只進行修改部分的推送和保證資料有序性。而此種場景比較適合使用資料匯流排佇列實現。如資料庫資料修改後需要同步資料到快取,或者需要將乙個機房資料同步到另乙個機房,只是資料維度的同步,此時應該使用資料匯流排佇列如canal、otter、databus;使用資料匯流排佇列的好處是可以保證資料的有序性。
混合佇列
在《構建需求響應式億級商品詳情頁》曾介紹過該方式的佇列,使用混合佇列來解決實際問題。
此處mq是使用京東自研的jmq,訊息是可靠持久化儲存的;應用會按照不同的維度發布訊息到jmq;下游應用接收到該訊息後會放入到redis,使用redis list來儲存這些任務;應用將redis訊息消費處理後,會按照不同的維度聚合商品訊息然後再次傳送出去。
使用redis佇列的主要原因是想提公升訊息堆積能力和併發處理能力。另外在使用redis構建訊息佇列時需要考慮網路抖動造成的訊息丟失問題,因為redis是沒有回滾事務的,或者說是確認機制。我們使用如下方式防止訊息丟失:
try而對於失敗我們會進行重試三次,重試失敗後放入失敗佇列,而失敗佇列是具有防重功能的(從本地佇列和失敗佇列排重),使用的是redis lua指令碼實現:catch
(exception e)
staticredis作者antirez開發的記憶體分布式訊息佇列disque是未來更好的記憶體訊息佇列選擇。eventqueuescript
add_to_fail_queue_redis_script
= new
eventqueuescript(
"redis.call('lrem', keys[1], 1, argv[1]) redis.call('lrem', keys[2], 1, argv[1]) return redis.call('lpush', keys[2], argv[1])"
);
其他優先順序佇列:在實際開發時肯定有些任務是緊急的,此時應該優先處理緊急的任務;所以請考慮對佇列進行分級。
副本佇列:在進行一些系統重構或者上新的功能時,如果沒有足夠的信心保證業務邏輯正確,可以考慮儲存乙份佇列的副本(比如1小時、1天的),從而當業務出現問題時可以對這些訊息進行回放。
映象佇列:每個佇列不會無限制訂閱數量,一定會有乙個極限的;當到達極限時請考慮使用映象佇列方式解決該問題。
佇列併發數:不同佇列實現,佇列服務端併發連線數是不一樣的;一定不是增大佇列併發連線數消費能力也隨著增加;也不會因為增加了消費伺服器消費併發能力也隨著增加,需要根據實際情況來設定合理的併發連線數。
推還是拉:訊息體內容不是越全越好,需要根據具體業務設計訊息體;如有些系統依賴商品變更訊息(只有乙個sku)、有些系統依賴商品狀態訊息(sku、狀態)、有些系統依賴商品屬性變更訊息(sku、變更的屬性)等,如果讓所有系統都消費商品變更訊息,那麼這些系統都會呼叫商品查詢服務拉一下最新的商品資訊然後進行處理。因此要根據實際情況來決定是使用推送方式(將系統需要的所有資訊推過去)還是拉取方式(只推送id,然後再查一遍)。
訊息合併:如果訊息寫入量非常大,應該考慮將訊息合併寫,可以"寫應用本地磁碟佇列"-->「同步本地磁碟佇列到訊息中介軟體」;同步時可以根據需求制定同步策略,如1秒同步1次
聊聊高併發系統之佇列術
佇列在資料結構中是一種線性表,從一端插入資料,然後從另一端刪除資料。本文目的不是講解各種佇列演算法,而是在應用層面講述使用佇列能解決哪些場景問題。在我開發過的系統中,不是所有的業務都必須實時處理 不是所有的請求都必須實時反饋結果給使用者 不是所有的請求 處理都必須100 處理成功 不知道誰依賴 我 ...
聊聊高併發系統之佇列術
佇列在資料結構中是一種線性表,從一端插入資料,然後從另一端刪除資料。本文目的不是講解各種佇列演算法,而是在應用層面講述使用佇列能解決哪些場景問題。在我開發過的系統中,不是所有的業務都必須實時處理 不是所有的請求都必須實時反饋結果給使用者 不是所有的請求 處理都必須100 處理成功 不知道誰依賴 我 ...
聊聊高併發系統之降級特技
降級的最終目的是保證核心服務可用,即使是有損的。而且有些服務是無法降級的 如加入購物車 結算 在進行降級之前要對系統進行梳理,看看系統是不是可以丟卒保帥 從而梳理出哪些必須誓死保護,哪些可降級 比如可以參考日誌級別設定預案 一般 比如有些服務偶爾因為網路抖動或者服務正在上線而超時,可以自動降級 警告...