抽象類中有乙個重要的方法dorebalance()
,這個方法是對外使用的。即使用的方式就是建立乙個負載均衡實現類的例項,然後呼叫它的dorebalance()方法即可進行。
幹的核心事情:
主要流程說明:
public
void
dorebalance
(final
boolean isorder)
this
.truncatemessagequeuenotmytopic()
;}
private
void
rebalancebytopic
(final string topic,
final
boolean isorder)
case clustering:
// 看當前主題當前消費者的分配到的mq是否變化了
// 下面這個函式很重要,如果消費者分配到了乙個之前沒有的mq,那麼將會建立乙個pullrequest給pullmessageservice實現對訊息的拉取,如果之前有但是現在沒有的那麼將會呼叫裡面的processqueue的droped方法來銷毀,即不再拉取訊息。
boolean changed =
this
.updateprocessqueuetableinrebalance
(topic, allocateresultset, isorder);if
(changed)
, group={}, topic={}, clientid={}, mqallsize={}, cidallsize={}, rebalanceresultsize={}, rebalanceresultset={}"
, strategy.
getname()
, consumergroup, topic,
this
.mqclientfactory.
getclientid()
, mqset.
size()
, cidall.
size()
, allocateresultset.
size()
, allocateresultset)
;this
.messagequeuechanged
(topic, mqset, allocateresultset);}
}break;}
}
關係說明:在mqclientinstance「客戶端例項工廠」裡面維護了乙個負載均衡後台服務執行緒rebalanceservice
。它裡面還維護了乙個消費者組–>消費者內部客戶端例項這樣的乙個map。每個消費者內部客戶端裡面維護了乙個負載均衡實現類。如下圖所示
呼叫流程:rebalanceservice服務執行緒的run()
方法呼叫mqclientinstance
的dorebalance()
,在此方法或遍歷消費者組–>消費者內部客戶端這個map,並呼叫每個消費者內部客戶端的dorebalance()
,此方法會呼叫負載均衡元件的dorebalance(...)
方法。
負載均衡的後台服務執行緒在mq的客戶端mqclientinstance
啟動的,它有乙個成員變數
public
class
mqclientinstance
賦初值是在它的建構函式中
public
mqclientinstance
(clientconfig clientconfig,
int instanceindex, string clientid, rpchook rpchook)
負載均衡的啟動是在它的start()方法中,呼叫了rebalanceservice
的start()
方法
public
void
start()
throws mqclientexception
定時的負載均衡就在rebalanceservice中實現了。
下面從幾個問題出發來分析其中的機制
它的類結構圖如下,可以看到它繼承自servicethread
抽象類,屬於rocketmq
中典型的建立後台服務執行緒執行方式,可參考rocketmq中服務執行緒的分析。只要啟動rebalanceservice,它就會一直在後台執行,不會停,相當於乙個定時任務。
下面是rebalanceservice中run()
方法體
public
void
run(
) log.
info
(this
.getservicename()
+" service end");
}
首先呼叫的是mqclientinstance的dorebalance()
方法,在該方法中,首先獲取的是消費者組內部客戶端map,然後遍歷這個map中的元素,並呼叫內部客戶端(mqconsumerinner)
的dorebalance()
進行負載均衡。在乙個jvm
中只存在乙個mqclientinstance
,而乙個消費者組又只存在乙個消費者內部客戶端,所以進行負載均衡就是對每個消費者組內部客戶端執行負載均衡,。
參考資料
在rocketmq中,部署多台機器上的消費者負載均衡是如何解決的?
RocketMQ消費者實踐
最近工作中用到了rocketmq,現記錄下,如何正確實現消費 防止重複消費 如何快速消費 消費失敗如何處理 重複消費會造成資料不一致等問題。所以,消費者要做到消費冪等。1 每次消費,記錄messageid 如果再次消費該message,查詢messageid是否已存在,已存在,就跳過消費 2 使用具...
RocketMQ 消費者核心配置詳解
topic 下佇列的奇偶數會影響 customer 個數裡面的消費數量 如果是4個佇列,8個訊息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均。如果 consumer 例項的數量比 message queue 的總數量還多的話,多出來的 consumer 例項將無法分到 queue,也就...
RocketMQ建立多個消費者問題分析
在乙個程序中同乙個消費組建立多個消費者會出現the consumer group groupname has been created before,specify another name please.defaultmqpushconsumer consumer1 new defaultmqpu...