Rocketmq訊息持久化

2021-08-20 03:18:36 字數 3463 閱讀 1565

producer send()的message最終將由broker處理,處理類為:sendmessageprocessor ,處理方法:processrequet.

public class sendmessageprocessor extends abstractsendmessageprocessor implements nettyrequestprocessor 

@override

public remotingcommand processrequest(channelhandlercontext ctx, remotingcommand request) throws remotingcommandexception {}

上述方法,並不是直接處理訊息,而是交由messagestore處理,相關**如下:

messageextbrokerinner msginner = new messageextbrokerinner();

msginner.settopic(requestheader.gettopic());

msginner.setqueueid(queueidint);

//......

putmessageresult putmessageresult =this.brokercontroller.getmessagestore().putmessage(msginner);

然而messagestore也不直接持久化訊息,轉交給 commitlog

long begintime = this.getsystemclock().now();

putmessageresult result =this.commitlog.putmessages(messageextbatch);

//持久化到磁碟,最終通過filechannel持久化到檔案

handlediskflush(result, putmessageresult, messageextbatch);

handleha(result, putmessageresult, messageextbatch);

2.cousumer

從broker讀訊息。

消費者從broker讀取訊息經由pullmessageprocessor類處理的,processrequest()方法處理請求:

remotingcommand processrequest(final channel channel, remotingcommand request, boolean brokerallowsuspend)

經過一系列的判斷處理,之後交由 messagestore:

final getmessageresult getmessageresult =

this.brokercontroller.getmessagestore().getmessage(requestheader.getconsumergroup(), requestheader.gettopic(),

requestheader.getqueueid(), requestheader.getqueueoffset(), requestheader.getmaxmsgnums(), messagefilter);

讀取訊息。

之後交由commitlog,讀出訊息,

可以看到是先從consumerqueue中獲取訊息索引,然後再從commitlog中讀取訊息內容。這些內容也是在儲存訊息的時候寫入的。

本文編寫,參考:

producer send()的message最終將由broker處理,處理類為:sendmessageprocessor ,處理方法:processrequet.

public class sendmessageprocessor extends abstractsendmessageprocessor implements nettyrequestprocessor 

@override

public remotingcommand processrequest(channelhandlercontext ctx, remotingcommand request) throws remotingcommandexception {}

上述方法,並不是直接處理訊息,而是交由messagestore處理,相關**如下:

messageextbrokerinner msginner = new messageextbrokerinner();

msginner.settopic(requestheader.gettopic());

msginner.setqueueid(queueidint);

//......

putmessageresult putmessageresult =this.brokercontroller.getmessagestore().putmessage(msginner);

然而messagestore也不直接持久化訊息,轉交給 commitlog

long begintime = this.getsystemclock().now();

putmessageresult result =this.commitlog.putmessages(messageextbatch);

//持久化到磁碟,最終通過filechannel持久化到檔案

handlediskflush(result, putmessageresult, messageextbatch);

handleha(result, putmessageresult, messageextbatch);

2.cousumer

從broker讀訊息。

消費者從broker讀取訊息經由pullmessageprocessor類處理的,processrequest()方法處理請求:

remotingcommand processrequest(final channel channel, remotingcommand request, boolean brokerallowsuspend)

經過一系列的判斷處理,之後交由 messagestore:

final getmessageresult getmessageresult =

this.brokercontroller.getmessagestore().getmessage(requestheader.getconsumergroup(), requestheader.gettopic(),

requestheader.getqueueid(), requestheader.getqueueoffset(), requestheader.getmaxmsgnums(), messagefilter);

讀取訊息。

之後交由commitlog,讀出訊息,

可以看到是先從consumerqueue中獲取訊息索引,然後再從commitlog中讀取訊息內容。這些內容也是在儲存訊息的時候寫入的。

Activemq訊息持久化

官方文件 activemq持久化相關配置 usr local apache activemq 5.11.1 conf activemq.xml 官方預設的持久化為kahadb 可以稍作調優 indexwritebatchsize 1000 journalmaxfilelength 32mb enab...

rabbitmq 訊息持久化

專案案例 channel.exchangedeclare my exchange,builtinexchangetype.topic,true durable 引數設定為 truechannel.queuedeclare my queue,true,false,false maps.newhashm...

ActiveMQ 訊息持久化

可持久化機制 兩種機制 用一句話簡單明瞭來說,就是將mq中的資料儲存乙份,資料被傳送,則將儲存的資料刪除,如果沒有傳送成功則重新傳送。先不說具體的實現,整體的思路是這樣的,如果mq是一台伺服器,我們最好做到物理上的備份,這樣在機器出問題的時候,我們能夠恢復資料。一般將資料儲存到第三方雲上。或者說另外...