學習Rocketmq producer啟動 一

2021-12-29 21:32:57 字數 3181 閱讀 1041

學習rocketmq-producer啟動(一):最近看了下阿里rocketmq,裡面有很多的東西值得我們學習。

defaultmqproducer producer = new defaultmqproducer("cluster");

producer.setnamesrvaddr(myutils.getnamesrvaddr());

producer.start();

乙個producer最簡單的建立流程為以上幾步

new defaultmqproducer()物件並設定其groupname 設定namesrvaddr 啟動

public class defaultmqproducer extends clientconfig implements mqproducer檢視defaultmqproducer原始碼不難發現該類是繼承clientconfig ,並去實現了mqproducer介面.

clientconfig

private string namesrvaddr = system.getproperty(mixall.namesrv_addr_property,

system.getenv(mixall.namesrv_addr_env));//namesrvaddr 位址

private string clientip = remotingutil.getlocaladdress();//當前ip

private string instancename = system.getproperty("rocketmq.client.name", "default");//instancename

defaultmqproducer

private string createtopickey = mixall.default_topic;//預設topic名稱

private volatile int defaulttopicqueuenums = 4;//預設topic對了數量

private int sendmsgtimeout = 3000;//傳送超時

private int compressmsgbodyoverhowmuch = 1024 * 4;//壓縮

private int retrytimeswhensendfailed = 2;//傳送重試次數

private boolean retryanotherbrokerwhennotstoreok = false;//換別的broker

private int maxmessagesize = 1024 * 128;//最大msgbody長度

當建立乙個defaultmqproducer物件時候會去初始化clientconfig 與defaultmqproducer中一系列的引數比如從環境變數去拿namesrvaddr位址,當前客戶端的ip等.

下圖為乙個producer啟動整個時序圖.

**解讀

this.checkconfig();改方法主要是針對producergroup是否為空,是否占用預設的groupname等一些列的驗證.

this.mqclientfactory=mqclientmanager.getinstance().getandcreatemqclientinstance(this.defaultmqproducer,rpchook);此處呼叫了getandcreatemqclientinstance()方法,並把當前的producer物件,以及rpchook作為引數傳入

public mqclientinstance getandcreatemqclientinstance(final clientconfig clientconfig, rpchook rpchook) else

}return instance;

}建立乙個當前producer的clientid格式為ip@pid 如果當前客戶端不在mq客戶端例項集合中,則建立乙個例項並加入boolean registerok=mqclientfactory.registerproducer(this.defaultmqproducer.getproducergroup(), this);把當前返回客戶端例項註冊到producer集合中

this.topicpublishinfotable.put(this.defaultmqproducer.getcreatetopickey(), new topicpublishinfo());將topic放入topic集合中.createtopickey其實是在程式配置的乙個預設的topic名字.

為什麼會傳入乙個預設的topic名字呢?

應為在建立每乙個topic時,程式會去這個預設的topic作為模板,改topic以及所有建立的topic會儲存在當前使用者目錄下的store/config/topics.json檔案中.具體建立過程在後面的章節會詳細說明.

mqclientfactory.start();mqclientinstance.start()方法主要分為以下幾個部分

獲取nameservice位址 啟動client端遠端通訊 啟動各種定時

更新nameservice位址 更新從nameservice更新topic路由資訊 清理掛掉的broker,向broker傳送心跳資訊 持久化consumeroffset 調整消費執行緒池 啟動拉取訊息服務 啟動消費端負載均衡服務 再次呼叫defaultmqproducerimpl.start() 改變servicestate狀態

this.mqclientfactory.sendheartbeattoallbrokerwithlock();心跳服務

疑點

1. mqclientinstance.start()方法中最後this.defaultmqproducer.getdefaultmqproducerimpl().start(false);反過來又去呼叫一次defaultmqproducerimpl.start()方法這個我比較費解.感覺什麼都沒做,就呼叫了一次心跳服務.

2. 在rocketmq中不管是producer,還是consumer都抽象成乙個mqclientinstance,啟動時都會去呼叫mqclientinstance.start().該方法啟動的server大部分都是和consumer相關,而在producer啟動也在呼叫.

以上兩個問題我看**中比較有疑惑的,請大神們來指點

學習學習再學習

如果乙個技能足夠複雜 比如從零學程式設計 那就不要指望讀完一本書就可以打天下。多買幾本書同類的書 因為每個作者的出發點是不一樣的,哪怕對同乙個概念都有不同的解釋說明。理解知識的重要過程之一就如牛的反芻一樣,要嚼一遍 嚥下去 再吐出來 再嚼一遍 再嚥下去 所以,既然一本書可以讀幾遍,那麼同一話題多應該...

學習 學習 再學習

原本要使用vs2005開發乙個b s專案的,沒有想到只能先暫時停停了,居然跟不上技術的發展了,呵呵,一直使用delphi delphi也沒能跟上 沒有想到轉到vs2005上竟然有這麼多要學的東西,當然目的是了做乙個好的系統。最近一直在學習asp.net ajax,雖然專案停了,但是我覺得值得,有很多...

只是學習 學習 再學習

通過做 讓我學會了很多東西 什麼 flash div css html js as 雖然都只是皮毛 不過 算是了解那麼一點點吧 哈哈 我還突然發現 我的 數學和英語 進步了不少 而且還都是很實用的 比在學校的進步可快多了 那句話說的很不錯 在你了解了一些皮毛之後你會發現很多東西你都必須去學。因為少一...