RocketMQ 訊息拉取Pull

2021-09-26 06:06:06 字數 1409 閱讀 3441

訊息拉取在實踐過程中,有以下幾個問題需要考慮:

1、如何全量拉取訊息?

2、如何指定messagequeue從指定offset處拉取訊息?

3、如何更新messagequeue的offset標誌位?

4、pull模式下如何實現負載均衡?

consumer.updateconsumeoffset(mq, pullresult.getnextbeginoffset());
public static void main(string args) throws mqclientexception 

break;

case no_matched_msg:

break;

case no_new_msg:

break;

case offset_illegal:

break;

default:

break;}}

} catch (exception e)

} consumer.shutdown();

}

public class pullmqconsumermain 

break;

case no_matched_msg:

break;

case no_new_msg:

break;

case offset_illegal:

break;

default:

break;

}} catch (exception e)

consumer.shutdown();

}}

public class pullschedulemain 

break;

case no_matched_msg:

break;

case no_new_msg:

break;

case offset_illegal:

break;

default:

break;

}consumer.updateconsumeoffset(mq, pullresult.getnextbeginoffset());

context.setpullnextdelaytimemillis(10000);

} catch (exception e)

}});

scheduleservice.start();

}}

分別用四個例子回答了上面的4個問題,後續會通過原始碼解析的方式描述pull模式的實現機制。

遺留問題:

呼叫fetchmessagequeuesinbalance函式時,返回messagequeue為空

RocketMQ訊息型別

普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...

RocketMQ 事務訊息

一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...

rocketMq訊息查詢

最近有人問我知道rocketmq是怎麼查詢訊息的,我發現我貌似回答不上來,所以抽空就把這塊內容補充一下,主要是講清楚根據key查詢訊息和根據msgid查詢訊息兩塊內容。看下引數列表中我們可以看到 k指出了核心key的引數,指定了根據key查詢訊息的方法,這個命令返回的是msgid,據說還有一些坑,可...