持久化可以提高rabbitmq的可靠性,以防在異常情況(重啟、關閉、右機等)下的資料丟失。
rabbitmq的持久化分為三個部分:
持久化客戶端**:
public class send catch (ioexception e) catch (timeoutexception e) finally catch (ioexception e) }}
}
客戶端傳送訊息後,消費端沒有消費訊息,重啟rabbitmq服務後,訊息還在,說明訊息已經實現持久化。
可以將所有的訊息都設定持久化,但影響rabbitmq的效能。因為寫入磁碟速度比記憶體慢。對於可靠性不高的訊息可以不採用持久化來提高整體吞吐量。
將佇列、訊息都設定持久化後不能保證資料百分百不丟失,因為如果在消費者訂閱佇列時將autoack
引數設定為true
,當消費者接收到訊息後,還沒來得及處理就宕機,資料也會丟失。要解決這種情況,需要設定引數autoack
為false
進行手動確認:
public class recvier
});} catch (ioexception e) catch (timeoutexception e)
}}
即使設定了消費者手動確認,也只是減少了資料丟失少,還不能百分百持久化。在持久化訊息存入記憶體後,需要一點點時間(時間很短,但不可忽視)才能存入磁碟。如果在宕機、重啟時,訊息還是可能沒來得及存入磁碟而丟失。rabbitmq還提供有:
預設情況下,傳送訊息的操作是不會返回任何訊息給生產者,也即生產者是不知道訊息有沒有正確地到達伺服器。
如果在訊息到達伺服器之前已經丟失,持久化操作也解決不了這個問題,因為訊息根本沒有到達伺服器。
rabbitmq在生產者確認中提供兩種方式:
開啟事務後,傳送訊息到rabbitmq。如果事務提交成功,則訊息一定到達rabbitmq中;如果在事務提交之前由於rabbitmq異常崩潰或其他原因丟擲異常,將其捕獲,從而進行回滾:
public class send catch (ioexception e)
} catch (ioexception e) catch (timeoutexception e) finally catch (ioexception e) }}
}}
如果要傳送多條訊息,則將channel.basicpublic和channel.txcommit等方法放進迴圈體內:
//開啟事務
channel.txselect();
try
} catch (ioexception e)
使用事務機制能夠解決訊息傳送方和rabbitmq之間訊息確認的問題,但是使用事務機制消耗rabbitmq太多的效能,所以rabbitmq還提供了乙個改進方案,即傳送方確認機制。
前面使用事務機制解決訊息傳送方來確認訊息到達rabbitmq,但該事務機制會嚴重降低rabbitmq的訊息吞吐量,所以推薦使用另一方式---傳送方確認(publisher confirm)機制。
生產者將通道設定成confirm
確認模式,所有在該通道上面發布的訊息都會被指派乙個唯一的id(從1開始),一旦訊息被投遞到所有匹配的佇列之後,rabbitmq就會傳送乙個確認(basic.ack)給生產者(包含訊息的唯一id),這就使得生產者知曉訊息已經正確到達了目的地了。如果訊息和佇列是可持久化的,那麼確認訊息會在訊息寫入磁碟之後發出。
rabbitmq回傳給生產者的確認訊息中的deliverytag包含了確認訊息序號,此外rabbitmq也可以設定channel.basicack方法中的multiple引數,表示這個序號之前的所有訊息已經處理。
事務機制在一條訊息傳送之後會使傳送端阻塞,以等待rabbitmq的回應,之後才能繼續傳送下一條訊息。相比之下,傳送方確認機制最大的好處在於它是非同步的,一旦發布一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用程式便可以通過**方法來處理該確認訊息,如果rabbitmq因為自身內部錯誤導致訊息丟失,就會傳送一條nack(basic.nack)命令,生產者應用程式同樣可以在**方法中處理該nack命令。
傳送方確認機制:
public class send else
channel.waitforconfirmsordie();
} catch (interruptedexception e)
} catch (ioexception e) catch (timeoutexception e) finally catch (ioexception e) }}
}}
如果要傳送多條訊息,修改部分如下:
//將通道設定為傳送方確認模式
channel.confirmselect();
try else
}} catch (interruptedexception e)
對於waitforconfirms
方法,如果通道沒有開啟confirm
模式,丟擲interruptedexception
。不帶參的waitforconfirms
,其返回的條件是客戶端收到相應的basic.ack/nack
或被中斷。帶timeout
參的waitforconfirms
表示超過指定時間丟擲timeoutexception
。對於另外兩個waitforconfirmsordie
方法,在接收到rabbitmq返回的basic.nack
之後會丟擲ioexception
。
publish confirm
模式是每傳送一條訊息後就呼叫channel.waitforconfirms
方法,之後等待服務端的確認,這實際上是一種序列同步等待的方式。事務機制和它一樣,傳送訊息之後等待服務端確認,之後再傳送訊息。兩者的儲存確認原理相同,尤其對於持久化的訊息來說,兩者都需要等待訊息確認存檔之後才會返回(呼叫linux核心的fsync方法)。在同步等待的方式下,publisherconfum機制傳送一條訊息需要通訊互動的命令是2條:basic.publish和basic.ack;事務機制是3條:basic.publish、tx.commmit!.commit-ok(或者tx.rollback!.rollback-ok),事務機制多了乙個命令幀報文的互動,所以qps會略微下降。
注意:事務機制和
publisher confirm
機制兩者是互斥的事務機制和
pubisher confirm
機制確保的是訊息能夠正確地傳送到rabbitmq(交換器),如果交換器沒有佇列,則訊息仍然會丟失。所以需要配合mandatory
引數或者備份交換器使用提高訊息傳輸的可靠性。
publisher confirm
的優勢在於並不一定需要同步確認。改進使用方式有以下兩種:
批量confirm
方法中,客戶端程式需要定期或者定量(達到多少條),亦或者兩者結合起來呼叫channel.waitforconfirms
來等待rabbitmq的確認返回。相比於前面示例中的普通confirm
方法,批量極大地提公升了confirm
的效率,但是問題在於出現返回basic.nack
或者超時情況時,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息數量,並且當訊息經常丟失時,批量confirm的效能應該是不公升反降的。(不推薦使用,了解即可)
try
try
// 將快取中的訊息重新傳送
}catch(interruptedexceptione)
} catch (ioexceptione)
//監聽confirm返回資訊,ack被接收,nack異常
channel.addconfirmlistener(new confirmlistener()
} else
}//異常不能被接收,可以從這重新設定進行傳送
@override
public void handlenack(long deliverytag, boolean multiple) throws ioexception
});} catch (ioexception e) catch (timeoutexception e)
}}
對於在實際生產環境中,建議使用非同步confirm
。 Kafka訊息可靠性
如果mq沒有類似資料庫事務結構和保證,是不可能達到訊息投遞100 可靠的,極端情況下訊息投遞要麼丟失或重複。下面咋們從producer,broker,consumer的角度分析一下kafka中會出現哪些情況。目前生產者傳送訊息 request.required.acks 有三種方式。acks 0 p...
二 訊息可靠性
可以在下面兩個地方進行設定 messageproducer producer session.createproducer queue producer.setdeliverymode deliverymode non persistent for int i 0 i 100 i 2.1 普通top...
ActiveMQ訊息的可靠性
我們在activemq訊息持久化訂閱中,介紹了對topic模式下的訊息進行持久化訂閱,使其在暫無消費者消費或activemq服務重啟的情況下,不會導致訊息的丟失,這裡其實就是保證了一定程度的訊息可靠性。那麼還會在其他地方傳送訊息不可靠的情況麼,首先我們從訊息的生產及消費的流程中來看,訊息有生產者傳送...