我理解的RocketMQ 消費者負載均衡的實現

2021-10-25 16:12:26 字數 3566 閱讀 8097

抽象類中有乙個重要的方法dorebalance(),這個方法是對外使用的。即使用的方式就是建立乙個負載均衡實現類的例項,然後呼叫它的dorebalance()方法即可進行。

幹的核心事情:

主要流程說明:

public

void

dorebalance

(final

boolean isorder)

this

.truncatemessagequeuenotmytopic()

;}

private

void

rebalancebytopic

(final string topic,

final

boolean isorder)

case clustering:

// 看當前主題當前消費者的分配到的mq是否變化了

// 下面這個函式很重要,如果消費者分配到了乙個之前沒有的mq,那麼將會建立乙個pullrequest給pullmessageservice實現對訊息的拉取,如果之前有但是現在沒有的那麼將會呼叫裡面的processqueue的droped方法來銷毀,即不再拉取訊息。

boolean changed =

this

.updateprocessqueuetableinrebalance

(topic, allocateresultset, isorder);if

(changed)

, group={}, topic={}, clientid={}, mqallsize={}, cidallsize={}, rebalanceresultsize={}, rebalanceresultset={}"

, strategy.

getname()

, consumergroup, topic,

this

.mqclientfactory.

getclientid()

, mqset.

size()

, cidall.

size()

, allocateresultset.

size()

, allocateresultset)

;this

.messagequeuechanged

(topic, mqset, allocateresultset);}

}break;}

}

關係說明:在mqclientinstance「客戶端例項工廠」裡面維護了乙個負載均衡後台服務執行緒rebalanceservice。它裡面還維護了乙個消費者組–>消費者內部客戶端例項這樣的乙個map。每個消費者內部客戶端裡面維護了乙個負載均衡實現類。如下圖所示

呼叫流程:rebalanceservice服務執行緒的run()方法呼叫mqclientinstancedorebalance(),在此方法或遍歷消費者組–>消費者內部客戶端這個map,並呼叫每個消費者內部客戶端的dorebalance(),此方法會呼叫負載均衡元件的dorebalance(...)方法。

負載均衡的後台服務執行緒在mq的客戶端mqclientinstance啟動的,它有乙個成員變數

public

class

mqclientinstance

賦初值是在它的建構函式中

public

mqclientinstance

(clientconfig clientconfig,

int instanceindex, string clientid, rpchook rpchook)

負載均衡的啟動是在它的start()方法中,呼叫了rebalanceservicestart()方法

public

void

start()

throws mqclientexception

定時的負載均衡就在rebalanceservice中實現了。

下面從幾個問題出發來分析其中的機制

它的類結構圖如下,可以看到它繼承自servicethread抽象類,屬於rocketmq中典型的建立後台服務執行緒執行方式,可參考rocketmq中服務執行緒的分析。只要啟動rebalanceservice,它就會一直在後台執行,不會停,相當於乙個定時任務。

下面是rebalanceservice中run()方法體

public

void

run(

) log.

info

(this

.getservicename()

+" service end");

}

首先呼叫的是mqclientinstance的dorebalance()方法,在該方法中,首先獲取的是消費者組內部客戶端map,然後遍歷這個map中的元素,並呼叫內部客戶端(mqconsumerinner)dorebalance()進行負載均衡。在乙個jvm中只存在乙個mqclientinstance,而乙個消費者組又只存在乙個消費者內部客戶端,所以進行負載均衡就是對每個消費者組內部客戶端執行負載均衡,。

參考資料

在rocketmq中,部署多台機器上的消費者負載均衡是如何解決的?

RocketMQ消費者實踐

最近工作中用到了rocketmq,現記錄下,如何正確實現消費 防止重複消費 如何快速消費 消費失敗如何處理 重複消費會造成資料不一致等問題。所以,消費者要做到消費冪等。1 每次消費,記錄messageid 如果再次消費該message,查詢messageid是否已存在,已存在,就跳過消費 2 使用具...

RocketMQ 消費者核心配置詳解

topic 下佇列的奇偶數會影響 customer 個數裡面的消費數量 如果是4個佇列,8個訊息,4個節點則會各消費2條,如果不對等,則負載均衡會分配不均。如果 consumer 例項的數量比 message queue 的總數量還多的話,多出來的 consumer 例項將無法分到 queue,也就...

RocketMQ建立多個消費者問題分析

在乙個程序中同乙個消費組建立多個消費者會出現the consumer group groupname has been created before,specify another name please.defaultmqpushconsumer consumer1 new defaultmqpu...