org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
有序訊息需要生產者,消費者一起配合,生產者要保證每次訊息都要投遞到broker的同乙個佇列裡,消費者需要設定
關鍵點 生產這傳送 同步訊息且指定佇列 syncsendorderly
消費者指定順序消費 consumemode = consumemode.orderly
生產者
@resource
private rocketmqtemplate rocketmqtemplate;
for (int i = 0; i < 10; i++) else
}
日誌
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=0]
messagequeue [topic=string-topic, brokername=localhost.localdomain, queueid=3]
消費者1
@service
@rocketmqmessagelistener(topic = "$", consumergroup = "string_consumer",consumemode = consumemode.orderly)
public class stringconsumer implements rocketmqlistener, rocketmqpushconsumerlifecyclelistener
@override
public void preparestart(defaultmqpushconsumer defaultmqpushconsumer)
}
消費者2
@service
@rocketmqmessagelistener(topic = "$", consumergroup = "string_consumer",consumemode = consumemode.orderly)
public class stringconsumer2 implements rocketmqlistener, rocketmqpushconsumerlifecyclelistener
@override
public void preparestart(defaultmqpushconsumer defaultmqpushconsumer)
}
結果
*****==msg0
-----------msgaa1
2019-11-11 10:49:28.167 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2330e0000 cost: 7 ms
2019-11-11 10:49:28.167 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2331f0002 cost: 7 ms
-----------msgaa3
*****==msg2
2019-11-11 10:49:28.210 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233270004 cost: 0 ms
*****==msg4
2019-11-11 10:49:28.210 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2332b0006 cost: 0 ms
2019-11-11 10:49:28.212 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233300009 cost: 0 ms
-----------msgaa5
*****==msg6
2019-11-11 10:49:28.214 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d23334000c cost: 0 ms
-----------msgaa7
2019-11-11 10:49:28.216 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2333e0010 cost: 0 ms
2019-11-11 10:49:28.216 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d2333a000e cost: 0 ms
-----------msgaa9
*****==msg8
2019-11-11 10:49:28.217 info 1977 --- [messagethread_8] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233410012 cost: 0 ms
2019-11-11 10:49:28.217 info 1977 --- [messagethread_6] a.r.s.s.defaultrocketmqlistenercontainer : consume c0a8016109ca18b4aac235d233440014 cost: 0 ms
rocketmq實現順序消費
訊息有序指的是可以按照訊息的傳送順序來消費。rocketmq可以嚴格的保證訊息有序。但這個順序,不是全域性順序,只是分割槽 queue 順序。要全域性順序只能乙個分割槽。之所以出現你這個場景看起來不是順序的,是因為傳送訊息的時候,訊息傳送預設是會採用輪詢的方式傳送到不通的queue 分割槽 如圖 而...
如何保證rocketmq消費順序
問題 我們知道訊息佇列可以在高併發的情況下,實現 削峰填谷,以及可以 非同步解偶。但是 某些業務場景下,我們需要 保證訊息是嚴格按照一定順序 去消費的,這時候我們要怎麼辦?以rocketmq為例。我們 知道 訊息從 producer 傳送到 broker 佇列中 一般是 輪訓傳送到 多個broker...
RocketMQ 如何保證訊息順序消費
rocketmq支援區域性順序消費,但不支援全域性,換句話說針對topic中的每個queue是可以按照fifo進行消費。要保證乙個訂單有關的訊息順序消費,有兩點需要注意,一是將訂單有關的訊息傳送到相關topic中同乙個queue裡,二是消費者按照先進先出的原則進行消費。在訊息傳送時,需指定對應的me...