kafka作為乙個訊息中介軟體系統,面臨的首要問題就是訊息如何持久化,如何方便地進行讀寫和解析。本文將就kafka的訊息儲存問題開乙個頭,後續將會對重要的**部分一一講解。kafka的訊息概念,首先我們在此談論的不是網路傳遞中的訊息,而更多偏向於記錄的意思,也就是消費者和生產者所在意的實際物件。訊息是kafka造作的最小單元,並不允許更改訊息的實際內容,一條訊息本質上是乙個鍵值可預設的鍵值對。
下面依次列出每一部分
* 1. 4 byte crc32 校檢值
* 2. 1 byte "magic" 識別符號來顯示訊息格式是否發生了改動,值為0/1(也可以看作是版本號)
* 3. 1 byte "attributes" 識別符號,包含以下內容:
* bit 0 ~ 2 : 壓縮編碼方式
* 0 : no compression
* 1 : gzip
* 3 : lz4
* bit 3 : 時間戳型別
* 0 : create time
* bit 4 ~ 7 : 保留部分
* 4. (可選) 8 byte 時間戳,只有magic為1時才攜帶該部分
* 5. 4 byte key length, 指定key部分的長度
* 6. k byte key
* 7. 4 byte payload length, 指定值的長度
* 8. v byte payload
kafka的訊息格式在設計上允許多重巢狀,這種巢狀是通過壓縮實現的。試想一下,某個訊息的key為空,然後它的value部分是乙個壓縮後的messageset,那麼經過解壓縮並讀取後它就是乙個鍵值對集合了,這有些類似於json了。但實際上kafka的訊息只允許二重巢狀,這並非由其訊息格式的侷限性決定,而是考慮到讀取訊息時解析的複雜度決定。巢狀訊息使得kafka傳遞複雜型別的物件成為可能,但是出於效能因素的考慮,物件序列化和過於複雜的資料格式並不適合訊息系統這一業務,或者說kafka在效能方面和表達能力上做了乙個漂亮的妥協。
同時,我們還要重點來談一談時間戳相關,時間戳有三種取值,分別是-1代表不帶時間戳,0代表該訊息建立的時間,1代表它持久化時間(也可以理解為入庫被kafka處理的時間)。對於巢狀的訊息來說,若我們選擇時間戳型別為入庫時間,則被壓縮訊息的時間戳和其外層訊息一致;若我們選擇時間戳型別為建立時間,則應該從位元組碼流中讀取;若magic值為0.則不管如何,都應該認為時間戳為-1,時間戳型別為create_time。
雖然看起來訊息格式好像比較簡單,但實際上**卻相對有些複雜,最重要的問題是1、要相容magic為0的情況,2、要能為後續的版本公升級留出擴充套件。我們首先思考一下,message類應該具有哪些功能呢大致分為下面幾個部分:
預定義的變數
由於操作的是bytes,大量的取值需要位操作,所以我們應該預定義好一些位操作輔助變數和一些重要的偏移位置。這裡面有幾個比較重要的預定義變數需要著重強調一下:
下面上**
/**
* specifies the mask for the compression code. 3 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/val compressioncodemask: int = 0x07
/*** specifies the mask for timestamp type. 1 bit at the 4th least significant bit.
*/val timestamptypemask: byte = 0x08
val timestamptypeattributebitoffset: int = 3
public byte updateattributes(byte attributes)
public static timestamptype forattributes(byte attributes)
def compressioncodec: compressioncodec =
compressioncodec.getcompressioncodec(buffer.get(attributesoffset) & compressioncodemask)
各個屬性的get方法
這個不用多說,需要注意的就是magic的值不同,有些取值的偏移位置不一樣,所以需要事先寫好靜態方法快捷獲得不同magic下的位置偏移。
合法性檢查
主要檢查以下幾個方面:
不同magic值下的message相互轉換
1. 計算新message需要的空間大小並分配
2. 寫入新的magic值
3. 取原來的attribute並用設定的timestamptype更新它,然後寫入attribute值
4. 若是0->1則寫入時間戳
5.寫入原來的訊息體
6.計算新的crc值並填充
一系列的構造方法
主要的構造途徑有兩條:
下面就依次介紹每個類的作用
RocketMQ訊息儲存一 概覽
前面介紹了訊息的傳送,這節主要介紹訊息的儲存。這裡只關注普通資訊,事務訊息在後面介紹。一般有分布式kv 檔案系統 db等,不同的mq根據設計的不同有各自的選擇,rocketmq同kafka一樣,選擇了檔案系統作為儲存方式,在儲存設計上借鑑了kafka。kafka將訊息用topic partition...
kafka 訊息的儲存分析
為了規避隨機讀寫帶來的時間消耗,kafka採用順序寫的方式儲存資料。即使是這樣,但是i o操作仍然會造成磁碟的效能瓶頸,所以kafka還有乙個效能策略。一般應用程式有乙個buffer空間在使用者空間中,來自於網路或者磁碟,無論來自網路或者磁碟,都需要通過核心,也就是說核心中也要有buffer。1 磁...
Kafka 日誌訊息儲存時間
分段策略屬性 屬性名含義 預設值log.roll.日誌滾動的週期時間,到達指定週期時間時,強制生成乙個新的segment 168 7day log.segment.bytes 每個segment的最大容量。到達指定容量時,將強制生成乙個新的segment 1g 1為不限制 log.retention...