在訊息傳遞過程中,如果出現傳遞失敗的情況,傳送方會執行重試,重試過程中就有可能產生重複的訊息。如果沒有對重複訊息進行處理,就可能導致系統的資料出現錯誤。
比如,乙個消費訂單訊息,統計下單金額的微服務,如果沒有正確處理重複訊息,那就會出現重複統計,導致統計結果錯誤。
一、訊息重複的情況必然存在
在mqtt協議中,給出了三種傳遞訊息時能夠提供的服務質量標準:
這個服務質量標準不僅適用於 mqtt,對所有的訊息佇列都是適用的。常用的絕大部分訊息佇列提供的服務質量都是 at least once,包括 rocketmq、rabbitmq 和 kafka。也就是說,訊息佇列很難保證訊息不重複。
注意:kafka 支援的「exactly once」和我們剛剛提到的訊息傳遞的服務質量標準「exactly once」是不一樣的,它是 kafka 提供的另外乙個特性,kafka 中支援的事務也和我們通常意義理解的事務有一定的差異。在 kafka 中,事務和 excactly once 主要是為了配合流計算使用的特性。
二、用冪等性解決重複訊息問題
冪等本來是乙個數學上的概念,它的定義是:如果乙個函式f(x)滿足:f(f(x)) = f(x),則函式f(x)滿足公尺冪等性。擴充套件到計算機領域,被用來描述乙個操作、方法或者服務。
舉例:1、在不考慮併發的情況下,「將賬戶 x 的餘額設定為 100 元」,執行一次後對系統的影響是,賬戶 x 的餘額變成了 100 元。只要提供的引數 100 元不變,那即使再執行多少次,賬戶 x 的餘額始終都是 100 元,不會變化,這個操作就是乙個冪等的操作。
2、「將賬戶 x 的餘額加 100 元」,這個操作它就不是冪等的,每執行一次,賬戶餘額就會增加 100 元,執行多次和執行一次對系統的影響(也就是賬戶的餘額)是不一樣的。
如果消費訊息的業務邏輯具備冪等性,那就不用擔心訊息重複的問題,因為同一條訊息,消費一次和消費多次對系統的影響是完全一樣的。消費多次等於消費一次。從對系統的影響結果來說:at least once + 冪等消費 = exactly once。
實現冪等操作最好的方式是,從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。
常用的設計冪等操作的方法:
(1)利用資料庫的唯一約束實現冪等
上面提到的那個不具備冪等特性的轉賬的例子:將賬戶 x 的餘額加 100 元。在這個例子中,我們可以通過改造業務邏輯,讓它具備冪等性。
首先,我們可以限定,對於每個轉賬單每個賬戶只可以執行一次變更操作,在分布式系統中,這個限制實現的方法非常多,最簡單的是我們在資料庫中建一張轉賬流水表,這個表有三個字段:轉賬單 id、賬戶 id 和變更金額,然後給轉賬單 id 和賬戶 id 這兩個字段聯合起來建立乙個唯一約束,這樣對於相同的轉賬單 id 和賬戶 id,表裡至多只能存在一條記錄。
這樣,我們消費訊息的邏輯可以變為:「在轉賬流水表中增加一條轉賬記錄,然後再根據轉賬記錄,非同步操作更新使用者餘額即可。」在轉賬流水表增加一條轉賬記錄這個操作中,由於我們在這個表中預先定義了「賬戶 id 轉賬單 id」的唯一約束,對於同乙個轉賬單同乙個賬戶只能插入一條記錄,後續重複的插入操作都會失敗,這樣就實現了乙個冪等的操作。
基於這個思路,不光是可以使用關係型資料庫,只要是支援類似「insert if not exist」語義的儲存類系統都可以用於實現冪等,比如,你可以用 redis 的 setnx 命令來替代資料庫中的唯一約束,來實現冪等消費。
(2)為更新的資料設定前置條件
給資料變更設定乙個前置條件,如果滿足條件就更新資料,否則拒絕更新資料,在更新資料的時候,同時變更前置條件中需要判斷的資料。這樣,重複執行這個操作時,由於第一次更新資料的時候已經變更了前置條件中需要判斷的資料,不滿足前置條件,則不會重複執行更新資料操作。
比如,「將賬戶 x 的餘額增加 100 元」這個操作並不滿足冪等性,我們可以把這個操作加上乙個前置條件,變為:「如果賬戶 x 當前的餘額為 500 元,將餘額加 100 元」,這個操作就具備了冪等性。對應到訊息佇列中的使用時,可以在發訊息時在訊息體中帶上當前的餘額,在消費的時候進行判斷資料庫中,當前餘額是否與訊息中的餘額相等,只有相等才執行變更操作。
但是,如果我們要更新的資料不是數值,或者我們要做乙個比較複雜的更新操作怎麼辦?用什麼作為前置判斷條件呢?更加通用的方法是,給你的資料增加乙個版本號屬性,每次更資料前,比較當前資料的版本號是否和訊息中的版本號一致,如果不一致就拒絕更新資料,更新資料的同時將版本號 +1,一樣可以實現冪等更新。
(3)記錄並檢查操作
如果上面提到的兩種實現冪等方法都不能適用於你的場景,還有一種通用性最強,適用範圍最廣的實現冪等性方法:記錄並檢查操作,也稱為「token 機制或者 guid(全域性唯一 id)機制」,實現的思路特別簡單:在執行資料更新操作之前,先檢查一下是否執行過這個更新操作。這種方法適用範圍最廣,但是實現難度和複雜度也比較高,一般不推薦使用。
具體的實現方法是,在傳送訊息時,給每條訊息指定乙個全域性唯一的 id,消費時,先根據這個 id 檢查這條訊息是否有被消費過,如果沒有消費過,才更新資料,然後將消費狀態置為已消費。
在分布式系統中,這個方法其實是非常難實現的。首先,給每個訊息指定乙個全域性唯一的 id 就是一件不那麼簡單的事兒,方法有很多,但都不太好同時滿足簡單、高可用和高效能,或多或少都要有些犧牲。更加麻煩的是,在「檢查消費狀態,然後更新資料並且設定消費狀態」中,三個操作必須作為一組操作保證原子性,才能真正實現冪等,否則就會出現 bug。
比如說,對於同一條訊息:「全域性 id 為 8,操作為:給 id 為 666 賬戶增加 100 元」,有可能出現這樣的情況:
這樣就會導致賬戶被錯誤地增加了兩次 100 元,這是乙個在分布式系統中非常容易犯的錯誤,一定要引以為戒。對於這個問題,當然我們可以用事務來實現,也可以用鎖來實現,但是在分布式系統中,無論是分布式事務還是分布式鎖都是比較難解決問題。
電商場景下,如何處理消費過程中的重複訊息?
摘要 比如乙個消費訂單訊息,統計下單金額的微服務。若不正確處理重複訊息,就會出現重複統計。那僅靠mq能保證訊息不重複嗎?訊息傳遞過程中若失敗,則傳送方會執行重試,重試就可能產生重複訊息。若不處理重複訊息,可能收穫驚喜。比如乙個消費訂單訊息,統計下單金額的微服務。若不正確處理重複訊息,就會出現重複統計...
五 如何處理訊息的重複消費
1at most once 至多一次 乙個訊息只被消費一次,不管是否成功,允許丟失一部分訊息。2at least once至少一次 一條訊息會被重複消費 3exactly once恰好一次 一條訊息只被處理一次,且必須成功,不能夠失敗 大部分訊息佇列都是符合 at least once,如rabbi...
04 如何保證訊息佇列中的訊息不被重複消費
1 面試題 如何保證訊息不被重複消費啊 如何保證訊息消費時的冪等性 2 面試官心裡分析 其實這個很常見的乙個問題,這倆問題基本可以連起來問。既然是消費訊息,那肯定要考慮考慮會不會重複消費?能不能避免重複消費?或者重複消費了也別造成系統異常可以嗎?這個是mq領域的基本問題,其實本質上還是問你使用訊息佇...