了解一些 rabbitmq 的實現原理也是很有必要的,它可以讓你在遇到問題時能透過現象看本質。
比如乙個佇列的內部儲存其實是由5個子佇列來流轉運作的,佇列中的訊息可以有4種不同的狀態等,通過這些可以明白在使用 rabbitmq 時盡量不要有過多的訊息堆積,不然會影響整體服務的效能。
rabbitmq儲存層包含兩個部分:佇列索引和訊息儲存。
rabbitmq訊息有兩種型別:持久化訊息和非持久化訊息,這兩種訊息都會被寫入磁碟。
持久化訊息在到達佇列時寫入磁碟,同時會記憶體中儲存乙份備份,當記憶體吃緊時,訊息從記憶體中清除。這會提高一定的效能。非持久化訊息一般只存於記憶體中,當記憶體吃緊時會被換入磁碟,以節省記憶體空間。
佇列索引:rabbit_queue_index(下文簡稱index)
index維護佇列的落盤訊息的資訊,如儲存地點、是否已被交付給消費者、是否已被消費者ack等。每個佇列都有相對應的index。
index使用順序的段檔案來儲存,字尾為.idx,檔名從0開始累加,每個段檔案中包含固定的segment_entry_count條記錄,預設值是16384。每個index從磁碟中讀取訊息的時候,至少要在記憶體中維護乙個段檔案,所以設定queue_index_embed_msgs_below值得時候要格外謹慎,一點點增大也可能會引起記憶體**式增長。
訊息儲存:rabbit_msg_store(下文簡稱store)
store以鍵值的形式儲存訊息,所有佇列共享同乙個store,每個節點有且只有乙個。
從技術層面上說,store還可分為msg_store_persistent和msg_store_transient,前者負責持久化訊息的持久化,重啟後訊息不會丟失;後者負責非持久化訊息的持久化,重啟後訊息會丟失。通常情況下,兩者習慣性的被當作乙個整體。
store使用檔案來儲存,字尾為.rdq,經過store處理的所有訊息都會以追加的方式寫入到該檔案中,當該檔案的大小超過指定的限制(file_size_limit)後,將會關閉該檔案並建立乙個新的檔案以供新的訊息寫入。檔名從0開始進行累加。在進行訊息的儲存時,rabbitmq會在ets(erlang term storage)表中記錄訊息在檔案中的位置對映和檔案的相關資訊。
訊息(包括訊息頭、訊息體、屬性)可以直接儲存在index中,也可以儲存在store中。最佳的方式是較小的訊息存在index中,而較大的訊息存在store中。這個訊息大小的界定可以通過queue_index_embed_msgs_below來配置,預設值為4096b。當乙個訊息小於設定的大小閾值時,就可以儲存在index中,這樣效能上可以得到優化(可理解為資料庫的覆蓋索引和回表)。
讀取訊息時,先根據訊息的id(msg_id)找到對應儲存的檔案,如果檔案存在並且未被鎖住,則直接開啟檔案,從指定位置讀取訊息內容。如果檔案不存在或者被鎖住了,則傳送請求由store進行處理。
通常佇列由rabbit_amqqueue_process和backing_queue這兩部分組成,rabbit_amqqueue_process負責協議相關的訊息處理,即接收生產者發布的訊息、向消費者交付訊息、處理訊息的確認(包括生產端的confirm和消費端的ack)等。backing_queue是訊息儲存的具體形式和引擎,並向rabbit_amqqueue_process提供相關的介面以供呼叫。
如果訊息投遞的目的佇列是空的,並且有消費者訂閱了這個佇列,那麼該訊息會直接傳送給消費者,不會經過佇列這一步。當訊息無法直接投遞給消費者時,需要暫時將訊息存入佇列,以便重新投遞。
rabbitmq的佇列訊息有4種狀態:
訊息存入佇列後,不是固定不變的,它會隨著系統的負載在佇列中不斷流動,訊息的狀態會不斷傳送變化。持久化的訊息,索引和內容都必須先儲存在磁碟上,才會處於上述狀態中的一種,gama狀態只有持久化訊息才會有的狀態。
在執行時,rabbitmq會根據訊息傳遞的速度定期計算乙個當前記憶體中能夠儲存的最大訊息數量(target_ram_count),如果alpha狀態的訊息數量大於此值,則會引起訊息的狀態轉換,多餘的訊息可能會轉換到beta、gama或者delta狀態。區分這4種狀態的主要作用是滿足不同的記憶體和cpu需求。
對於普通沒有設定優先順序和映象的佇列來說,backing_queue的預設實現是rabbit_variable_queue,其內部通過5個子佇列q1、q2、delta、q3、q4來體現訊息的各個狀態。
消費者獲取訊息也會引起訊息的狀態轉換。當消費者獲取訊息時,首先會從q4中獲取訊息,如果獲取成功則返回。如果q4為空,則嘗試從q3中獲取訊息,系統首先會判斷q3是否為空,如果為空則返回隊列為空,即此時佇列中無訊息。如果q3不為空,則取出q3中的訊息,進而再判斷此時q3和delta中的長度,如果都為空,則可以認為 q2、delta、 q3、q4 全部為空,此時將q1中的訊息直接轉移至q4,下次直接從 q4 中獲取訊息。如果q3為空,delta不為空,則將delta的訊息轉移至q3中,下次可以直接從q3中獲取訊息。在將訊息從delta轉移到q3的過程中,是按照索引分段讀取的,首先讀取某一段,然後判斷讀取的訊息的個數與delta中訊息的個數是否相等,如果相等,則可以判定此時delta中己無訊息,則直接將q2和剛讀取到的訊息一併放入到q3中,如果不相等,僅將此次讀取到的訊息轉移到q3。
這裡就有兩處疑問,第乙個疑問是:為什麼q3為空則可以認定整個隊列為空?試想一下,如果q3為空,delta不為空,那麼在q3取出最後一條訊息的時候,delta 上的訊息就會被轉移到q3這樣與 q3 為空矛盾;如果delta 為空且q2不為空,則在q3取出最後一條訊息時會將q2的訊息併入到q3中,這樣也與q3為空矛盾;在q3取出最後一條訊息之後,如果q2、delta、q3都為空,且q1不為空時,則q1的訊息會被轉移到q4,這與q4為空矛盾。其實這一番論述也解釋了另乙個問題:為什麼q3和delta都為空時,則可以認為 q2、delta、q3、q4全部為空?
通常在負載正常時,如果消費速度大於生產速度,對於不需要保證可靠不丟失的訊息來說,極有可能只會處於alpha狀態。對於持久化訊息,它一定會進入gamma狀態,在開啟publisher confirm機制時,只有到了gamma 狀態時才會確認該訊息己被接收,若訊息消費速度足夠快、記憶體也充足,這些訊息也不會繼續走到下乙個狀態。
在系統負載較高時,訊息若不能很快被消費掉,這些訊息就會進入到很深的佇列中去,這樣會增加處理每個訊息的平均開銷。因為要花更多的時間和資源處理「堆積」的訊息,如此用來處理新流入的訊息的能力就會降低,使得後流入的訊息又被積壓到很深的佇列中,繼續增大處理每個訊息的平均開銷,繼而情況變得越來越惡化,使得系統的處理能力大大降低。
要避免流控機制觸發,服務端預設配置是當記憶體使用達到40%,磁碟空閒空間小於50m,即啟動記憶體報警,磁碟報警;報警後服務端觸發流控(flowcontrol)機制。
一般地,當發布端傳送訊息速度快於訂閱端消費訊息的速度時,佇列中堆積了大量的訊息,導致報警,就會觸發流控機制。
觸發流控機制後,rabbitmq服務端接收發布來的訊息會變慢,使得進入佇列的訊息減少;
與此同時rabbitmq服務端的訊息推送也會受到極大的影響,測試發現,服務端推送訊息的頻率會大幅下降,等待下一次推送的時間,有時等1分鐘,有時5分鐘,甚至30分鐘。
一旦觸發流控,將導致rabbitmq服務端效能惡化,推送訊息也會變得非常緩慢;
因此要做好資料設計,使得傳送速率和接收速率保持平衡,而不至於引起伺服器堆積大量訊息,進而引發流控。通過增加伺服器集群節點,增加消費者,來避免流控發生,治標不治本,而且成本高。
應對這一問題一般有3種措施:
增加prefetch_count的值,即一次傳送多條訊息給消費者,加快訊息被消費的速度。
採用multiple ack,降低處理 ack 帶來的開銷
流量控制
rabbitmq儲存模型
rabbitmq儲存和佇列結構
佇列的順序儲存結構和鏈式儲存結構
佇列 queue 是只允許在一端進行插入操作,而在另一端進行刪除操作的線性表 在隊尾進行插入操作,在對頭進行刪除操作 與棧相反,佇列是一種 先進先出 first in first out,fifo 的線性表。與棧相同的是,佇列也是一種重要的線性結構,實現乙個佇列同樣需要順序表或鍊錶作為基礎。佇列既可...
棧和佇列 佇列及其儲存結構
佇列的順序儲存結構 1 佇列 queue 是只允許在一端進行插入操作,而在另一端進行刪除操作的線性表。2 與棧相反,佇列是一種先進先出的線性表.3 實現乙個佇列同樣需要順序表或鍊錶作為基礎。佇列即可用鍊錶實現,也可以用順序表實現,而棧一般用順序表實現,佇列用鍊錶實現,簡稱鏈佇列 typedef st...
資料結構 佇列 順序儲存結構佇列 鏈式儲存結構佇列
佇列是一種只允許在一端進行插入操作,而在另外一端進行刪除操作的線性表,特徵是先進先出,包括 順序儲存結 構佇列 鏈式儲存結構佇列。重點說明 迴圈佇列和鏈隊。在佇列中front為隊頭指標 rear為隊尾指標 佇列 佇列空的條件 rear front 佇列滿的條件 rear 1 queuesize fr...