對訊息服務需要了解的朋友,可以移步:
聊聊mq的使用場景
聊聊業務系統中投遞訊息到mq的幾種方式
談談mq訊息消費的幾種方式
如何確保訊息至少消費一次,確保消費者最大程度消費成功
消費者消費訊息有2中方式:
1. push方式
訊息服務接收到訊息之後,主動將訊息推送給消費者消費
2. pull方式
消費者定時從訊息服務中拉取訊息進行消費
下面我們將討論2中方式中如何確保訊息至少被消費一次。
消費的過程:
訊息服務查詢待消費的訊息列表
輪詢待訊息列表
呼叫消費者
消費者收到消費請求,執行業務處理,將處理結果返回給訊息服務
訊息服務接收到消費成功的資訊,將訊息狀態置為消費成功狀態
繼續消費下一條訊息
**一下上面需要考慮的問題:
若訊息一直消費失敗如何處理?
先說一下影響:
訊息被阻塞
訊息如果一直消費失敗,訊息服務會不斷呼叫消費者進行消費,會阻塞其他訊息的消費,直接影響到業務的正常進行.
消費失敗的原因:
**問題
這種情況不管嘗試多少次,訊息都會消費失敗,需要人工介入修復bug,這個可以依靠監控系統發現bug,同時開發進行修復。
系統執行異常
如呼叫超時、網路問題等一些不可控的因素。產生這種錯誤,繼續重試,最終會處理成功。
此處咱們只用討論訊息服務中重試機制如何設計?
系統異常情況下,可能過一段時間,系統恢復了,此時去重試,消費也就成功了。
所以我們對於消費失敗的訊息採用延遲處理的方式,可以這麼實現:
訊息中增加幾個字段用於重試:next_dispose_time【下次處理時間】、max_failure【最大允許失敗次數】、failure【當前失敗次數】,訊息入庫時:next_dispose_time=需消費的時間,max_failure = 執行最大失敗次數, failure=0;
當消費失敗時,處理過程:
計算下次處理時間(next_dispose_time),可以在當前時間上面做指數遞增,比如根據失敗次數依次在當前時間上遞增2的failure次方秒,如:
第1次失敗:當前時間 + 2秒
第2次失敗:當前時間 + 4秒
第3次失敗:當前時間 + 8秒
第4次失敗:當前時間 + 16秒
第n次失敗:當前時間 + 2的n次方秒
failure++
訊息服務查詢待消費的訊息也需要做調整:
select * from訊息表where next_dispose_time<=當前時間 and failure此時能夠最大程度保證訊息最少消費成功一次。
這種會複雜一些,為何會複雜一些,咱們先看一下常規的流程:
消費者從訊息服務中拉取訊息
本地進行處理
從訊息服務中刪除此訊息
繼續拉取下一條進行處理
如果本地一直處理失敗,那麼後面拉取到的都是同一條訊息,這條訊息直接阻塞後續訊息的消費,這種情況如何解?
咱們先分析一下出現這種問題的後果及原因:
後果:訊息被阻塞,業務無法正常執行
原因:**問題或其他異常
確保**沒問題,可以解決上面問題,及時性不夠高,線上要考慮系統的容錯能力。
遇到這種問題還是挺嚴重了,業務方都是無法接受的,一條訊息消費失敗,會影響到其他所有訊息的消費,這個我們還是得想辦法解決,可以這樣:
消費者拉取訊息
落地到本地
從訊息服務中刪除此訊息
非同步去消費本地落地的訊息
訊息先落地,然後非同步處理,本地需要有個補償的job,去處理本地消費失敗的訊息,這個可以參考push方式消費的過程。如何確保訊息不丟失?
訊息持久化,當然前提是佇列必須持久化 rabbitmq 確保永續性訊息能從伺服器重啟中恢復的方式是,將它們寫入磁碟上 的乙個持久化日誌檔案,當發布一條永續性訊息到持久交換器上時,rabbit 會在 訊息提交到日誌檔案後才傳送響應。一旦消費者從持久佇列中消費了一條持久化訊息,rabbitmq 會在持久...
訊息佇列如何確保訊息的有序性?
要想實現訊息有序,需要從 producer 和 consumer 兩方面來考慮。首先,producer 生產訊息的時候就必須要有序。然後,consumer 消費的時候,也要按順序來,不能亂。producer 有序 像 rabbitmq 這類普通的訊息系統,佇列結構簡單,producer 向佇列中傳送...
分布式訊息服務DMS如何實現死信訊息的消費
本文部分內容節選自華為雲幫助中心的分布式訊息服務 dms 服務的產品介紹 死信訊息是什麼 死信訊息是指無法被正常消費的訊息。分布式訊息服務dms支援對訊息進行異常處理。當訊息進行多次重複消費仍然失敗後,dms會將該條訊息轉存到死信佇列中,有效期為72小時,使用者可以根據需要對死信訊息進行重新消費。消...