前面介紹了訊息的傳送,這節主要介紹訊息的儲存。這裡只關注普通資訊,事務訊息在後面介紹。
一般有分布式kv、檔案系統、db等,不同的mq根據設計的不同有各自的選擇,rocketmq同kafka一樣,選擇了檔案系統作為儲存方式,在儲存設計上借鑑了kafka。
kafka將訊息用topic+partition分割成不同的檔案,在topic和partition數量較多的情況下,server端檔案的順序讀寫會變成隨機讀寫,效能嚴重下降。rocketmq則將所有topic的訊息全部儲存在乙個檔案中,這樣,對於發訊息的寫檔案操作來說,是完全的順序寫,效能不會因為topic數量產生影響。但是對於消費訊息的讀檔案操作來說,則變為了完全隨機的讀操作,並且儲存檔案中會存在大量不屬於本topic的訊息。為了解決這個問題,rocketmq在原始儲存檔案的基礎上,為每乙個topic建立了訊息索引檔案,消費訊息時先讀索引檔案,再根據索引位置讀取原始檔案中的訊息。這樣,訊息讀取就變為順序的了,不過卻額外增加了一次檔案讀操作。
儲存結構如下:
producer傳送訊息時,broker將訊息儲存到commitlog中,然後將commitlog的內容dispatch到對應topic的consumequeue中,即上面說的,所有topic訊息儲存在乙個commitlog中,每個topic有自己單獨的consumequeue。indexfile作為訊息的索引檔案,同理。
consumer消費訊息時,先從對應topic的consumequeue中讀取訊息的偏移量,並對訊息做初步過濾。之後從commitlog中讀取原始訊息。
這裡簡單說下rocketmq用到的page cache和mmap。
page cache的詳細內容請自行google。簡單來說就是對於linux系統所有的檔案io請求,作業系統都是通過page cache機制實現的,磁碟檔案都是有一系列固定大小的資料塊(4k,8k等)組成的。page cache從磁碟讀取檔案時,會進行預讀取,即讀入請求頁面和緊隨其後的幾個頁面,從而提高page cache命中率。rocketmq的consumequeue檔案儲存資料較少,並且是順序讀取,在page cache的加持下consumequeue的讀取效能會比較高,可以看做近乎於記憶體。
儲存從上往下一共分為5個層次,如圖
後面按照分層進行介紹。
業務處理層包含三個物件querymessageprocessor、sendmessageprocessor、pullmessageprocessor,分別進行分析。
查詢訊息,包含兩種請求的處理,querymessage() 和 viewmessagebyid(),具體邏輯委託給了defaultmessagestore進行操作,後面針對defaultmessagestore詳細分析。
處理傳送訊息,對於事務訊息,委託給transactionalmessageserviceimpl 處理,非事務訊息委託給defaultmessagestore處理。
拉取訊息,委託給defaultmessagestore處理。
RocketMQ訊息儲存
分布式佇列因為有高可靠性的要求,所以資料要進行持久化儲存。訊息生成者傳送訊息 mq收到訊息,將訊息進行持久化,在儲存中新增一條記錄 返回ack給生產者 mq push訊息給對應的消費者,然後等待消費者返回ack 如果訊息消費者在指定時間內成功返回ack,那麼mq認為訊息消費成功,在儲存中刪除訊息,即...
RocketMQ 訊息儲存
訊息儲存 主要的儲存檔案 1 訊息檔案 commitlog 2 訊息消費佇列檔案 consumequeue 3 hash索引檔案 indexfile 4 檢測點檔案 checkpoint 5 關閉異常檔案 abort 檔案刷盤機制 同步刷寫 訊息追加到記憶體後,立即將記憶體訊息刷寫到磁碟,再對客戶端...
RocketMQ 訊息儲存結構
引用 rmq採用順序寫,隨機讀的設計理念 commitlog順序寫,可以大大提高寫人效率。雖然是隨機讀,但是利用作業系統的pagecache機制,可以批量地從磁 盤讀取,作為cache存到記憶體中,加速後續的讀取速度。rocketmq訊息的儲存是由consumequeue和commitlog配合完成...