訊息處理流程中,如果consumer的消費速度跟不上producer的傳送速度,mq中未處理的訊息會越來越多(進的多出的少),這部分訊息就被稱為堆積訊息。訊息出現堆積進而會造成訊息的消費延遲。 以下場景需要重點關注訊息堆積和消費延遲問題:
consumer使用長輪詢pull模式消費訊息時,分為以下兩個階段:
1.訊息拉取
consumer通過長輪詢pull模式批量拉取的方式從服務端獲取訊息,將拉取到的訊息快取到本地緩衝佇列中。對於拉取式消費,在內網環境下會有很高的吞吐量,所以這一階段一般不會成為訊息堆積的瓶頸。
乙個單執行緒單分割槽的低規格主機(consumer,4c8g),其可達到幾萬的tps。如果是多個分割槽多 個執行緒,則可以輕鬆達到幾十萬的tps。2. 訊息消費consumer將本地快取的訊息提交到消費執行緒中,使用業務消費邏輯對訊息進行處理,處理完畢後獲取 到乙個結果。這是真正的訊息消費過程。此時consumer的消費能力就完全依賴於訊息的消費耗時和消 費併發度了。
如果由於業務處理邏輯複雜等原因,導致處理單條訊息的耗時較長,則整體的訊息吞吐量肯定不會高,此時就會導致consumer本地緩衝佇列達到上限,停止從服務端拉取訊息。
結論: 訊息堆積的主要瓶頸在於客戶端的消費能力,而消費能力由消費耗時和消費併發度決定。注意,消費 耗時的優先順序要高於消費併發度。即在保證了消費耗時的合理性前提下,再考慮消費併發度問題。
影響訊息處理時長的主要因素是**邏輯。而**邏輯中可能會影響處理時長**主要有兩種型別:cpu內部計算型**和外部i/o操作型**。
通常情況下**中如果沒有複雜的遞迴和迴圈的話,內部計算耗時相對外部i/o操作來說幾乎可以忽略。所以外部io型**是影響訊息處理時長的主要癥結所在。
外部io操作型**舉例:
關於下游系統呼叫邏輯需要進行提前梳理,掌握每個呼叫操作預期的耗時,這樣做是為了能夠 判斷消費邏輯中io操作的耗時是否合理。通常訊息堆積是由於下游系統出現了服務異常或達到 了dbms容量限制,導致消費耗時增加。
一般情況下,消費者端的消費併發度由單節點執行緒數和節點數量共同決定,其值為單節點執行緒數*節點數量。
不過,通常需要優先調整單節點的執行緒數,若單機硬體資源達到了上限,再需要通過橫向擴充套件來提高消費併發度。
單節點執行緒數,即單個consumer所包含的執行緒數量對於普通資訊、延時訊息及事務訊息,併發度計算都是單節點執行緒數*節點數量。節點數量,即consumer group所包含的consumer數量
但對於順序 訊息則是不同的。
順序訊息的消費併發度等於topic的queue分割槽數量。
1)全域性順序訊息:該型別訊息的topic只有乙個queue分割槽(一台機器)。其可以保證該topic的所有訊息被順序消費。
為了保證這個全域性順序性,consumer group中的任何乙個機器在同一時刻只能有乙個consumer消費者進行消費。所以其併發度為1。
2)分割槽順序訊息:該型別訊息的topic有多個queue分割槽(乙個topic下的queue分布在集群中的多個broker中)。
其僅可以保證該topic的某台broker的訊息被順序消費,不能保證整個topic中訊息的順序消費。
為了保證這個分割槽順序性, 每個queue分割槽中的訊息在consumer group中的同一時刻只能有乙個consumer的乙個執行緒進行 消費。即,在同一時刻最多會出現多個queue分蘖有多個consumer的多個執行緒並行消費。所以其併發度為topic的分割槽數量(該topic的broker分割槽數量)。
對於一台主機中線程池中線程數的設定需要謹慎,不能盲目直接調大執行緒數,設定過大的執行緒數反而會
帶來大量的執行緒切換的開銷。理想環境下單節點的最優執行緒數計算模型為:c *(t1 + t2)/ t1。
最優執行緒數 = c **(t1 + t2)/ t1 = c * t1/t1 + c * t2/t1 = c + c * t2/t1*
注意,該計算出的數值是理想狀態下的理論資料,在生產環境中,不建議直接使用。而是根據為了避免在業務使用時出現非預期的訊息堆積和消費延遲問題,需要在前期設計階段對整個業務邏輯進當前環境,先設定乙個比該值小的數值然後觀察其壓測效果,然後再根據效果逐步調大執行緒
數,直至找到在該環境中效能最佳時的值。
行完善的排查和梳理。其中最重要的就是梳理訊息的消費耗時和設定訊息消費的併發度。
梳理訊息的消費耗時
通過壓測獲取訊息的消費耗時,並對耗時較高的操作的**邏輯進行分析。梳理訊息的消費耗時需要關
注以下資訊:
設定消費併發度
對於訊息消費併發度的計算,可以通過以下兩步實施
節點數 = 流量峰值 / 單個節點訊息吞吐量
訊息被消費過後會被清理掉嗎?不會的。
訊息是被順序儲存在commitlog檔案的,且訊息大小不定長,所以訊息的清理是不可能以訊息為單位進 行清理的,而是以commitlog檔案為單位進行清理的。否則會急劇下降清理效率,並且實現起來邏輯複雜。
commitlog檔案存在乙個過期時間,預設為72小時,即三天。除了使用者手動清理外,在以下情況下也 會被自動清理,無**件中的訊息是否被消費過:
需要注意以下幾點:1)對於rocketmq系統來說,刪除乙個1g大小的檔案,是乙個壓力巨大的io操作。在刪除過程 中,系統效能會驟然下降。所以,其預設清理時間點為凌晨4點,訪問量最小的時間。也正因如 果,我們要保障磁碟空間的空閒率,不要使系統出現在其它時間點刪除commitlog檔案的情況。
2)官方建議rocketmq服務的linux檔案系統採用ext4。因為對於檔案刪除操作,ext4要比ext3性 能更好
RabbitMQ訊息堆積問題?
有時可能因為消費者自身 問題,導致沒辦法正常消費訊息,那麼就會導致訊息佇列中會堆積大量的訊息 或因為同一時間來了非常多的訊息,消費者沒辦法及時消費,導致訊息佇列中堆積了大量訊息。1.去優化消費者 提高消費能力。減少消費時間 2.可以給消費設定年齡 生命週期 如果超時就丟棄掉。可以不讓訊息大量堆積在訊...
RocketMQ訊息型別
普通資訊也叫做無序訊息,簡單來說就是沒有順序的訊息,producer 只管傳送訊息,consumer 只管接收訊息,至於訊息和訊息之間的順序並沒 可能先傳送的訊息先消費,也可能先傳送的訊息後消費。舉個簡單例子,producer 依次傳送 order id 為 1 2 3 的訊息到 broker,co...
RocketMQ 事務訊息
一 事務訊息實現方式 應用使用事務訊息的步驟 1 應用傳送訊息,使用prepare欄位標示準備訊息 2 應用執行本地業務邏輯 3 應用傳送事務提交或回滾訊息 broker收到prepare訊息後會將topic替換為rmq sys trans half topic,queueid替換為0,然後寫入co...