訊息拉取在實踐過程中,有以下幾個問題需要考慮:
1、如何全量拉取訊息?
2、如何指定messagequeue從指定offset處拉取訊息?
3、如何更新messagequeue的offset標誌位?
4、pull模式下如何實現負載均衡?
consumer.updateconsumeoffset(mq, pullresult.getnextbeginoffset());
public static void main(string args) throws mqclientexception
break;
case no_matched_msg:
break;
case no_new_msg:
break;
case offset_illegal:
break;
default:
break;}}
} catch (exception e)
} consumer.shutdown();
}
public class pullmqconsumermain
break;
case no_matched_msg:
break;
case no_new_msg:
break;
case offset_illegal:
break;
default:
break;
}} catch (exception e)
consumer.shutdown();
}}
public class pullschedulemain
break;
case no_matched_msg:
break;
case no_new_msg:
break;
case offset_illegal:
break;
default:
break;
}consumer.updateconsumeoffset(mq, pullresult.getnextbeginoffset());
context.setpullnextdelaytimemillis(10000);
} catch (exception e)
}});
scheduleservice.start();
}}
分別用四個例子回答了上面的4個問題,後續會通過原始碼解析的方式描述pull模式的實現機制。
遺留問題:
呼叫fetchmessagequeuesinbalance函式時,返回messagequeue為空
RocketMQ訊息型別
普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...
RocketMQ 事務訊息
一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...
rocketMq訊息查詢
最近有人問我知道rocketmq是怎麼查詢訊息的,我發現我貌似回答不上來,所以抽空就把這塊內容補充一下,主要是講清楚根據key查詢訊息和根據msgid查詢訊息兩塊內容。看下引數列表中我們可以看到 k指出了核心key的引數,指定了根據key查詢訊息的方法,這個命令返回的是msgid,據說還有一些坑,可...