訊息佇列RabbitMQ

2022-09-09 21:48:30 字數 4378 閱讀 3796

mq全稱為message queue,即訊息佇列。「訊息佇列」是在訊息的傳輸過程中儲存訊息的容器。它是典型的:生產者、消費者模型。生產者不斷向訊息佇列中生產訊息,消費者不斷的從佇列中獲取訊息。因為訊息的生產和消費都是非同步的,而且只關心訊息的傳送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

訊息佇列中介軟體是分布式系統中重要的元件,主要解決應用解耦,非同步訊息,流量削鋒等問題,實現高效能,高可用,可伸縮和最終一致性架構

實現mq的有兩種主流方式:jms和amqp兩種協議,下面是兩種協議的對比圖

訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可 能需要永續性儲存)等。

訊息的生產者,也是乙個向交換器發布訊息的客戶端應用程式。

交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。

exchange有4種型別:direct(預設),fanout,topic,和headers,不同型別的exchange**訊息的策略有所區別

訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。乙個訊息可投入乙個或多個佇列。訊息一直 在佇列裡面,等待消費者連線到這個佇列將其取走

路由鍵,生產者在將訊息傳送給exchange的時候,一般會指定乙個routing key,來指定這個訊息的路由規則,而這個routing key需要與exchange type及binding key聯合使用才能最終生效。(exchange type 就是 exchange的路由型別, binding key就是exchange與queue繫結時的乙個key,相當於給這個繫結取個名字.這個名字是跟 routing key 相同或者包含的)

繫結,用於訊息佇列和交換器之間的關聯。乙個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交 換器理解成乙個由繫結構成的路由表。

exchange和queue的繫結可以是多對多的關係。

網路連線,比如乙個tcp連線。

通道,多路復用連線中的一條獨立的雙向資料流通道。通道是建立在真實的tcp連線內的虛擬連線,amqp命令都是通過通道 發出去的,不管是發布訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷毀tcp都 是非常昂貴的開銷,所以引入了通道的概念,以復用一條tcp連線。

訊息的消費者,表示乙個從訊息佇列中取得訊息的客戶端應用程式。

虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加 密環境的獨立伺服器域。每個 vhost 本質上就是乙個mini版的rabbitmq 伺服器,擁 有自己的佇列、交換器、繫結和許可權機制。vhost是amqp概念的基礎,必須在連線時指定,rabbitmq 預設的vhost是/。

表示訊息佇列伺服器實體

exchange分發訊息時根據型別的不同分發策略有區別,目前共四種型別: direct、fanout、topic、headers。headers匹配amqp訊息的 header而不是路由鍵,headers交換器和direct交換器完全一致,但效能差很多,目前幾乎用不到了,所以直接看另外三種型別:

訊息中的路由鍵(routing key)如果和binding中的binding key一致,交換器就將訊息發到對應的佇列中。路由鍵與佇列名完全匹配,如果乙個佇列繫結到交換機要求路由鍵為"dog」,則只**routing key標記為「dog」的訊息,不會** "dog.puppy」,也不會**「dog.guard"等等。它是完全匹配、單播的模式。

每個發到fanout型別交換器的訊息都會分到所有繫結的佇列上去。fanout交換器不處理路由鍵,只是簡單的將佇列繫結到交換器上,每個傳送到交換器的訊息都會被**到與該交換器繫結的所有佇列上。fanout型別**訊息是最快的。

topic交換器通過模式匹配分配訊息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時佇列需要繫結到乙個模式上。它將路由鍵和繫結鍵的字串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個萬用字元:符號「#」和符號"x"。#幾個或多個單詞,*匹配乙個單詞。

在rabbitmq的官方教程中,rabbitmq為我們提供了6中訊息模型,第6種是rpc(遠端呼叫),不是mq(訊息佇列),所以rabbitmq真正的訊息模型只有5種

乙個生產者對應乙個消費者

work佇列模型是為了彌補基本訊息模型的缺陷而誕生的,是通過乙個佇列繫結多個消費者,以達到避免訊息在佇列中堆積的問題

direct(選擇性傳送),交換機不會將訊息傳送給所有佇列,而是通過佇列繫結到交換機上的routingkey(路由key)傳送到指定的佇列中

因為網路傳輸的不穩定性,當生產者在向mq傳送訊息的過程中,mq沒有接收到訊息

解決辦法:

1.使用事務(效能差)

使用amqp協議提供的乙個事務機制,在生產者傳送訊息之前,通過channel.txselect開啟乙個事務,接著傳送訊息, 如果訊息投遞server失敗,進行事務回滾channel.txrollback,然後重新傳送, 如果server收到訊息,就提交事務channel.txcommit

但是,很少有人這麼幹,因為這是同步操作,一條訊息傳送之後會使傳送端阻塞,以等待rabbitmq-server的回應,之後才能繼續傳送下一條訊息,生產者生產訊息的吞吐量和效能都會大大降低

2.傳送方確認機制(publisher confirm)

生產者開啟confirm模式之後,你每次寫的訊息都會分配乙個唯一的id,然後如果寫入了rabbitmq中,rabbitmq會給你回傳乙個ack訊息,告訴這個訊息成功還是失敗了。如果rabbitmq沒能處理這個訊息,會**你乙個nack介面,告訴你這個訊息接收失敗,可以重試。

3.做好容錯方法(try-catch)

傳送訊息可能會網路失敗,失敗後要有重試機制,可記錄到資料庫,採用定期掃瞄重發的方式。每一條訊息都做好日誌記錄,給資料庫儲存每乙個訊息的記錄,定期掃瞄資料庫將失敗的訊息重新傳送。

訊息抵達訊息佇列,要將訊息進行持久化,寫入磁碟才算成功。此時mq沒有持久化成功,mq宕機了造成訊息丟失

解決辦法:

傳送方確認機制(publisher confirm)

生產者開啟confirm模式之後,rabbitmq會給你回傳乙個ack訊息,告訴這個訊息成功還是失敗了。如果rabbitmq沒能處理這個訊息,會**你乙個nack介面,告訴你這個訊息接收失敗,此時修改資料庫中的mq狀態,進行訊息定期重試。配合上面的容錯方法

消費者剛拿到訊息準備消費的時候,此時消費端沒有消費訊息,mq使用的是自動ack模式,mq就會認為你已經消費了,把訊息丟掉

解決辦法

一定開啟手動ack,消費成功才移除,失敗或者沒來得及處理就noack並重新入隊

訊息消費成功,事務已經提交,ack時,機器宕機。導致沒有ack成功,broker的訊息 重新由unack變為ready,並傳送給其他消費者

解決辦法

消費者的業務消費介面應該設計為冪等性的。比如扣庫存有 工作單的狀態標誌

使用防重表(redis/mysql),傳送訊息每乙個都有業務的唯 一標識,處理過就不用處理

rabbitmq的每乙個訊息都有redelivered欄位,可以獲取是否 是被重新投遞過來的,而不是第一次投遞過來的

解決辦法

訊息佇列Rabbitmq

rabbitmq server rabbitmqctl reset rabbitmqctl stop rabbitmqctl stop rabbitmqctl list users rabbitmqctl list queues rabbitmqctl add user user name user...

訊息佇列RabbitMQ

這是乙個很嚴肅的問題。系統之間解除耦合,可以讓不同語言編寫的系統通訊互動 保證伺服器負載不會飆公升。高大上一點就是流量削峰。讓程式變成非同步,提高響應速度。把費時任務放到另乙個程序或執行緒去執行。redis實現 剛開始學習redis時,一看這個鍊錶不就是給佇列準備的嗎?所以,一心扎進去,要寫個佇列出...

RabbitMQ訊息佇列

訊息發布接收流程 接收訊息 工作模式 publish subscribe 發布訂閱模式 發布訂閱publish subscribe和工作模式work queues的區別 routing 路由模式 區別 topics 區別 header 宣告佇列 bean queue inform sms publi...