訊息有序指的是可以按照訊息的傳送順序來消費。
rocketmq可以嚴格的保證訊息有序。但這個順序,不是全域性順序,只是分割槽(queue)順序。要全域性順序只能乙個分割槽。
之所以出現你這個場景看起來不是順序的,是因為傳送訊息的時候,訊息傳送預設是會採用輪詢的方式傳送到不通的queue(分割槽)。如圖:
而消費端消費的時候,是會分配到多個queue的,多個queue是同時拉取提交消費。如圖:
但是同一條queue裡面,rocketmq的確是能保證fifo的。那麼要做到順序訊息,應該怎麼實現呢——把訊息確保投遞到同一條queue。
這樣同一批你需要做到順序消費的肯定會投遞到同乙個queue,同乙個queue肯定會投遞到同乙個消費例項,同乙個消費例項肯定是順序拉取並順序提交執行緒池的,只要保證消費端順序消費,則大功告成!
下面用訂單進行示例。乙個訂單的順序流程是:建立、付款、推送、完成。訂單號相同的訊息會被先後傳送到同乙個佇列中,消費時,同乙個orderid獲取到的肯定是同乙個佇列。
rocketmq訊息生產端示例**如下:
/*** producer,傳送順序訊息
*/public class producer ;
// 訂單列表
listorderlist = new producer().buildorders();
date date = new date();
******dateformat sdf = new ******dateformat("yyyy-mm-dd hh:mm:ss");
string datestr = sdf.format(date);
for (int i = 0; i < 10; i++)
}, orderlist.get(i).getorderid());//訂單id
system.out.println(sendresult + ", body:" + body);
}producer.shutdown();
} catch (mqclientexception e) catch (remotingexception e) catch (mqbrokerexception e) catch (interruptedexception e)
system.in.read();
}/**
* 生成模擬訂單資料
*/private listbuildorders()
輸出:
從圖中紅色框可以看出,orderid等於15103111039的訂單被順序放入queueid等於7的佇列。queueoffset同時在順序增長。
傳送時有序,接收(消費)時也要有序,才能保證順序消費。如下這段**演示了普通消費(非有序消費)的實現方式。
/*** 普通資訊消費
*/public class consumer
try catch (exception e)
return consumeconcurrentlystatus.consume_success;
}});
consumer.start();
system.out.println("consumer started.");}}
輸出:
可見,訂單號為15103111039的訂單被消費時順序完成亂了。所以用messagelistenerconcurrently這種消費者是無法做到順序消費的,採用下面這種方式就做到了順序消費:
/*** 順序訊息消費,帶事務方式(應用可控制offset什麼時候提交)
*/public class consumerinorder
try catch (exception e)
return consumeorderlystatus.success;
}});
consumer.start();
system.out.println("consumer started.");}}
輸出:
messagelistenerorderly能夠保證順序消費,從圖中我們也看到了期望的結果。圖中的輸出是只啟動了乙個消費者時的輸出,看起來訂單號還是混在一起,但是每組訂單號之間是有序的。因為訊息傳送時被分配到了三個佇列(參見前面生產者輸出日誌),那麼這三個佇列的訊息被這唯一消費者消費。
如果啟動2個消費者呢?那麼其中乙個消費者對應消費2個佇列,另乙個消費者對應消費剩下的1個佇列。
如果啟動3個消費者呢?那麼每個消費者都對應消費1個佇列,訂單號就區分開了。輸出變為這樣:
消費者1輸出:
消費者2輸出:
消費者3輸出:
很完美,有木有?!
按照這個示例,把訂單號取了做了乙個取模運算再丟到selector中,selector保證同乙個模的都會投遞到同一條queue。即: 相同訂單號的--->有相同的模--->有相同的queue。最後就會類似這樣:
總結:rocketmq的順序訊息需要滿足2點:
rocketmq 順序消費
org.apache.rocketmq rocketmq spring boot starter 2.0.3 有序訊息需要生產者,消費者一起配合,生產者要保證每次訊息都要投遞到broker的同乙個佇列裡,消費者需要設定 關鍵點 生產這傳送 同步訊息且指定佇列 syncsendorderly 消費者指...
如何保證rocketmq消費順序
問題 我們知道訊息佇列可以在高併發的情況下,實現 削峰填谷,以及可以 非同步解偶。但是 某些業務場景下,我們需要 保證訊息是嚴格按照一定順序 去消費的,這時候我們要怎麼辦?以rocketmq為例。我們 知道 訊息從 producer 傳送到 broker 佇列中 一般是 輪訓傳送到 多個broker...
RocketMQ 如何保證訊息順序消費
rocketmq支援區域性順序消費,但不支援全域性,換句話說針對topic中的每個queue是可以按照fifo進行消費。要保證乙個訂單有關的訊息順序消費,有兩點需要注意,一是將訂單有關的訊息傳送到相關topic中同乙個queue裡,二是消費者按照先進先出的原則進行消費。在訊息傳送時,需指定對應的me...