我們在activemq訊息持久化訂閱中,介紹了對topic模式下的訊息進行持久化訂閱,使其在暫無消費者消費或activemq服務重啟的情況下,不會導致訊息的丟失,這裡其實就是保證了一定程度的訊息可靠性。
那麼還會在其他地方傳送訊息不可靠的情況麼,首先我們從訊息的生產及消費的流程中來看,訊息有生產者傳送到activemq訊息佇列中,這個過程傳送訊息會發生訊息一般在queue模式下,是不會存在問題的,而在topic模式下我們也是可以進行持久化訂閱的,見activemq訊息持久化訂閱。
因為在生產者端,我們會使用send()
方法向activemq傳送訊息,預設情況下,持久化訊息以同步方式傳送,send() 方法會被阻塞,直到 broker 傳送乙個確認訊息給生產者,這個確認訊息表示broker已經成功接收到訊息,並且持久化訊息已經把訊息儲存到儲存中。
這裡我們又會發現,上述我們一共傳送了三次訊息,每次只有在send()
方法成功才會確定提交到我們的activemq,但是如果在業務中,這三條訊息是關聯的,這裡我們希望要不一起成功,要不一起失敗,那麼該如何處理呢。
這裡我們可以使用事務提交,事務中訊息(無論是否持久化),會進行非同步傳送,send() 方法不會被阻塞。但是commit() 方法會被阻塞,直到收到確認訊息,表示broker已經成功接收到訊息,並且持久化訊息已經把訊息儲存到儲存中。
這裡我們就會發現開啟了事務訊息之後,我們在send()方法之後,訊息不會立即到達activemq中,而必須等待commit之後,這裡我們可以通過activemq控制台,進行debug驗證,其實這個事務訊息和資料庫事務非常的類似。
看完了訊息的生產者,那麼我們訊息的消費者在消費訊息時,會產生問題麼?如訊息傳送成功後,接收端接收到了訊息。然後進行處理,但是可能由於某種原因,高併發也好,io阻塞也好,這條訊息在接收端處理失敗了。
但是如在queue模式下的特性是一條訊息,只會被乙個接收端給接收,只要接收端a接收成功了,接收端b就不可能接收到這條訊息,如果該訊息是一些比較重要的訊息,那麼就必須要保證訊息的可靠性。在消費者端對對訊息的確認有4種機制,如下:
首先我們再看第乙個auto_acknowledge,即自動確認應答,這裡我們需要特別注意的是,需要使用非同步的方式來消費訊息,因為在同步接受訊息時,在receive()方法接受到訊息後(還未進行業務處理),這時消費者就會自動確認
而在非同步接受訊息的時候,即使用***去消費訊息,這裡會直到onmessage()方法執行完成後(包括了業務處理),才會去自動確認
其中如果發生了異常,那麼activemq會對其進行重發,缺省會重試6次,重發6次失敗後,就會進入死信佇列中,這裡我們在上述給其手動丟擲乙個異常,結果如下:
注意:當我們使用messagelistener方式消費訊息時,可以在onmessage()方法中使用try-catch,這樣可以在處理訊息出錯時記錄一些資訊,而不是讓不斷去重發訊息;如果你沒有使用try-catch,就有可能會因為異常而導致訊息重複接收的問題,這時就必須需要注意onmessage()方法中邏輯是否能夠相容對重複訊息的判斷。
第二個client_acknowledge,表示客戶端手動確認,這就意味著acitvemq將不會自動的為我們進行ack任何訊息,需要我們自己擇機確認。
這裡我們需要就會使用同步消費方法了,因為最終是由我們自己進行確認,但是如果忘記呼叫acknowledge()方法,將會導致當consumer重啟後,會接受到重複訊息,因為對於broker而言,那些尚未真正ack的訊息被視為「未消費」。
我們可以在當前訊息處理成功之後,可以立即呼叫message.acknowledge()
方法來"逐個"確認訊息,這樣可以盡可能的減少因網路故障而導致訊息重發的個數;當然也可以處理多條訊息之後,間歇性的呼叫acknowledge()
方法來一次確認多條訊息,減少ack的次數來提公升consumer的效率,不過需要自行權衡。
dups_ok_acknowledge該引數類似於auto_ack確認機制,可以自動批量確認,而且具有「延遲」確認的特點,activemq會根據內部演算法,在收到一定數量的訊息自動進行確認。
在此模式下,可能會出現重複訊息,因為當consumer故障重啟後,那些尚未ack的訊息(但是實際上已經消費了),就會重新傳送過來。
最後乙個session_transacted,當session使用事務時,就是使用此模式。當決定事務中的訊息可以確認時,必須呼叫session.commit()
方法,commit()
方法將會導致當前 session 的事務中所有訊息立即被確認。
其實獲取session時,該方法的第乙個引數表示是否支援事務,如果為true,則會忽略第二個引數,自動被jms伺服器設定為session_transacted
在事務開始之後的任何時機呼叫rollback()
,意味著當前事務的結束,事務中所有的訊息都將被重發。當然在commit() 之前丟擲異常,也會導致事務的rollback()
這裡其實和我們上述在訊息生成者中,進行設定true使用了事務,其作用是相同的,就是我們需要使用commit()
來進行事務的提交。
ActiveMQ訊息的可靠性機制
1.jms訊息確認機制 jms訊息只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段 客戶接收訊息 客戶處理訊息和訊息被確認。在事務性會話中,當乙個事務被提交的時候,確認自動發生。在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式 acknowledgement mo...
ActiveMQ可靠性機制
訊息的簽收 acknowledgment 客戶端成功接收一條訊息的標誌是這條訊息被簽收。成功接收一條訊息一般包括如下三個階段 1 客戶端接收訊息 2 客戶端處理訊息 3 訊息被簽收 簽收可以由activemq發起,也可以由客戶端發起,取決於session簽收模式的設定。在帶事務的session中,簽...
Kafka訊息可靠性
如果mq沒有類似資料庫事務結構和保證,是不可能達到訊息投遞100 可靠的,極端情況下訊息投遞要麼丟失或重複。下面咋們從producer,broker,consumer的角度分析一下kafka中會出現哪些情況。目前生產者傳送訊息 request.required.acks 有三種方式。acks 0 p...