1.集群消費方式
乙個consumergroup中的consumer例項平均分攤消費生產者傳送的訊息。例如某個topic有九條訊息,其中乙個consumer group有三個例項(可能是3個程序,或者3臺機器),那麼每個例項只消費其中的3條訊息,consumer不指定消費方式的話預設是集群消費的,適用於大部分訊息的業務
2.廣播消費方式
一條訊息被多個consumer消費,幾十這些consumer屬於同乙個consumergroup,訊息也會被consumergroup中的每個consumer消費一次,廣播消費中的consumergroup概念可以認為在訊息劃分層面沒有意義,適用於一些分發訊息的場景,比如我訂單下單成功了,需要通知財務系統,客服系統等等這種分發的場景,可以通過修改consumer中的messagemodel來設定消費方式為廣播消費
package cn.baocl.rocketmq.consumer;
import cn.baocl.rocketmq.processor.mqconsumemsglistenerprocessor;
import com.alibaba.rocketmq.client.consumer.defaultmqpushconsumer;
import com.alibaba.rocketmq.client.exception.mqclientexception;
import com.alibaba.rocketmq.common.consumer.consumefromwhere;
import com.alibaba.rocketmq.common.protocol.heartbeat.messagemodel;
import org.slf4j.logger;
import org.slf4j.logge***ctory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.boot.springbootconfiguration;
import org.springframework.context.annotation.bean;
import org.springframework.util.stringutils;
@springbootconfiguration
public class mqconsumerconfiguration ")
private string namesrvaddr;
@value("$")
private string groupname;
@value("$")
private int consumethreadmin;
@value("$")
private int consumethreadmax;
@value("$")
private string topics;
@value("$")
private int consumemessagebatchmaxsize;
@autowired
private mqconsumemsglistenerprocessor mqmessagelistenerprocessor;
@bean
public defaultmqpushconsumer testrocketmqconsumer() throws exception
if (stringutils.isempty(namesrvaddr))
if(stringutils.isempty(topics))
defaultmqpushconsumer consumer = new defaultmqpushconsumer(groupname);
consumer.setnamesrvaddr(namesrvaddr);
consumer.setconsumethreadmin(consumethreadmin);
consumer.setconsumethreadmax(consumethreadmax);
consumer.registermessagelistener(mqmessagelistenerprocessor);
/*** 設定consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset);
/*** 設定消費模型,集群還是廣播,預設為集群
*///廣播
consumer.setmessagemodel(messagemodel.broadcasting);
//集群
/*** 設定一次消費訊息的條數,預設為1條
*/consumer.setconsumemessagebatchmaxsize(consumemessagebatchmaxsize);
try
consumer.start();
logger.info("consumer is start !!! groupname:{},topics:{},namesrvaddr:{}",groupname,topics,namesrvaddr);
}catch (mqclientexception e),topics:{},namesrvaddr:{}",groupname,topics,namesrvaddr,e);
throw new exception(e);
}return consumer;
}}
//廣播
consumer.setmessagemodel(messagemodel.broadcasting);
//集群
五分鐘帶你玩轉mybatis(二)常用標籤
open 以什麼開始 close 以什麼結束 separator 分隔符 collection list名稱 item index名稱 引用sql標籤 typename sort 在set時候省略最後乙個符號 code itemname criteria prefix 字首覆蓋並增加其內容 suff...
五分鐘玩轉git
許多人認為git太混亂,或認為它是一種複雜的版本控制系統,其實不然,這篇文章有助於大家快速上手使用git。使用git前,需要先建立乙個倉庫 repository 您可以使用乙個已經存在的目錄作為git倉庫或建立乙個空目錄。使用您當前目錄作為git倉庫,我們只需使它初始化。git init使用我們指定...
來吧,1分鐘帶你玩轉Kafka
摘要 kafka讓人又愛又恨?來吧,一分鐘帶你玩轉它 說起kafka,許多使用者對它是又愛又恨。kafka是一種分布式的 基於發布 訂閱的訊息系統,其極致體驗讓人欲罷不能,但操心的運維 複雜的安全策略 可靠性易用性的缺失 算不上極致的效能發揮 並不豐富的訊息服務功能,仍需要使用者付出諸多的背後工作。...