學習rocketmq,先寫乙個demo演示一下看看效果。
一、服務端部署
因為只是簡單的為了演示效果,服務端僅部署單master模式 —— 乙個name server節點,乙個broker節點。主要有一下過程。
編譯之後到distribution/target/apache-rocketmq目錄,後續所有操作都是在該路徑下。
cd distribution/target/apache-rocketmq
啟動name server,檢視日誌確認啟動成功。
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
啟動broker,檢視日誌確認啟動成功。
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
name server和broker都成功啟動,伺服器就部署完成了。更詳細的參考官方文件手冊,裡面還包含在伺服器上執行producer、customer示例,這裡主要是在專案中使用。
官網手冊戳這裡:quick start
二、客戶端搭建:spring boot專案中使用
客戶端分為訊息生產者和訊息消費者,這裡通過日誌列印輸出檢視效果,為了看起來更清晰,我新建了兩個模組分別作為訊息生產者和訊息消費者。
新增依賴,在兩個模組的pom檔案中新增以下配置。
org.apache.rocketmq
rocketmq-client
4.4.0
新增producer配置類,系統啟動時讀取yml檔案的配置資訊初始化producer。集群模式下,如果在同乙個jvm中,要往多個的mq集**送訊息,則需要建立多個的producer並設定不同的instancename,預設不需要設定該引數。
複製**
@configuration
public class producerconfiguration ")
private string producergroup;
/*** namesrv位址
*/@value("$")
private string namesrvaddr;
/*** 客戶端限制的訊息大小,超過報錯,同時服務端也會限制,需要跟服務端配合使用。預設4mb
*/@value("$")
private integer maxmessagesize;
/*** 傳送訊息超時時間,單位毫秒。預設10000
*/@value("$")
private integer sendmsgtimeout;
/*** 如果訊息傳送失敗,最大重試次數,該引數只對同步傳送模式起作用。預設2
*/@value("$")
private integer retrytimeswhensendfailed;
/*** 訊息body超過多大開始壓縮(consumer收到訊息會自動解壓縮),單位位元組。預設4096
*/@value("$")
private integer compressmsgbodyoverhowmuch;
/*** 在傳送訊息時,自動建立伺服器不存在的topic,需要指定key,該key可用於配置傳送訊息所在topic的預設路由。
*/@value("$")
private string createtopickey;
@bean
public defaultmqproducer getrocketmqproducer(www.yunshenpt.com)
if (this.sendmsgtimeout != null)
if (this.retrytimeswhensendfailed != null)
if (this.compressmsgbodyoverhowmuch != null)
if (strings.isnotblank(this.createtopickey))
try ], namesrvaddr:[{}]"
, this.producergroup, this.namesrvaddr);
} catch (mqclientexception e) ", e.getmessage(), e);
}return producer;}}
複製**
@test
public void send() throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception, unsupportedencodingexception }}
複製**
新增consumer配置。
複製**
複製**
新增訊息***,監聽到新訊息後,執行對應的業務邏輯。
複製**
@component
public class consumemsglistener implements messagelistenerconcurrently
for (messageext msg : msgs) receive new messages: {}", thread.currentthread().getname(), new string(msg.getbody()));
// do something
}} catch (exception e) else }}
return consumeconcurrentlystatus.consume_success;
CentOS 7 上systemctl 的用法
我們對service和chkconfig兩個命令都不陌生,systemctl 是管 務的主要工具,它整合了chkconfig 與 service功能於一體。注 代表某個服務的名字,如http的服務名為httpd 例如在centos 7 上安裝http 啟動服務 等同於service httpd st...
CentOS 7 上systemctl 的用法
我們對service和chkconfig兩個命令都不陌生,systemctl 是管 務的主要工具,它整合了chkconfig 與 service功能於一體。systemctl is enabled iptables.service systemctl is enabled servicename.s...
Centos7上的yum命令
用過centos系統的人都知道yum命令了,主要是用於管理系統裡面的軟體 進入命令介面的方法 在centos圖形介面的任意地方右鍵 open terminal 1.安裝乙個軟體時 httpd是安裝的軟體名 yum y install httpd2.安裝多個相類似的軟體時 httpd是安裝的軟體名 y...