介紹
mq全稱為message queue, 是一種分布式應用程式的的通訊方法,它是消費-生產者模型的乙個典型的代表,producer往訊息佇列中不斷寫入訊息,而另一端consumer則可以讀取或者訂閱佇列中的訊息。rabbitmq是mq產品的典型代表,是一款基於amqp協議可復用的企業訊息系統
系統架構
rabbitmq系統最核心的元件是exchange和queue,exchange和queue是在rabbitmq server(又叫做broker)端,producer和consumer在應用端。
原理大致圖(mq:message queue):
訊息佇列,提供了fifo的處理機制,具有快取訊息的能力。rabbitmq中,佇列訊息可以設定為持久化,臨時或者自動刪除。
設定為持久化的佇列,queue中的訊息會在server本地硬碟儲存乙份,防止系統crash,資料丟失
設定為臨時佇列,queue中的資料在系統重啟之後就會丟失
設定為自動刪除的佇列,當不存在使用者連線到server,佇列中的資料會被自動刪除
exchange類似於資料通訊網路中的交換機,提供訊息路由策略。rabbitmq中,producer不是通過通道直接將訊息傳送給queue,而是先傳送給exchange。乙個exchange可以和多個queue進行繫結,producer在傳遞訊息的時候,會傳遞乙個routing_key,exchange會根據這個routing_key按照特定的路由演算法,將訊息路由給指定的queue。和queue一樣,exchange也可設定為持久化,臨時或者自動刪除。
exchange有4種型別:direct(預設),fanout, topic, 和headers,不同型別的exchange**訊息的策略有所區別:
direct直接交換器,工作方式類似於單播,exchange會將訊息傳送完全匹配routing_key的queue
fanout廣播是式交換器,不管訊息的routing_key設定為什麼,exchange都會將訊息**給所有繫結的queue
topic主題交換器,工作方式類似於組播,exchange會將訊息**和routing_key匹配模式相同的所有佇列,比如,routing_key為user.stock的message會**給繫結匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的佇列。( * 表是匹配乙個任意片語,#表示匹配0個或多個片語)
headers訊息體的header匹配(ignore)
所謂繫結就是將乙個特定的 exchange 和乙個特定的 queue 繫結起來。exchange 和queue的繫結可以是多對多的關係
假設p1和c1註冊了相同的broker,exchange和queue。p1傳送的訊息最終會被c1消費。基本的通訊流程大概如下所示:
p1生產訊息,傳送給伺服器端的exchange
exchange收到訊息,根據routinkey,將訊息**給匹配的queue1
queue1收到訊息,將訊息傳送給訂閱者c1
c1收到訊息,傳送ack給佇列確認收到訊息
queue1收到ack,刪除佇列中快取的此條訊息
consumer收到訊息時需要顯式的向rabbit broker傳送basic.ack訊息或者consumer訂閱訊息時設定auto_ack引數為true。在通訊過程中,佇列對ack的處理有以下幾種情況:
如果consumer接收了訊息,傳送ack,rabbitmq會刪除佇列中這個訊息,傳送另一條訊息給consumer。
如果cosumer接受了訊息, 但在傳送ack之前斷開連線,rabbitmq會認為這條訊息沒有被deliver,在consumer在次連線的時候,這條訊息會被redeliver。
如果consumer接受了訊息,但是程式中有bug,忘記了ack,rabbitmq不會重**送訊息。
rabbitmq2.0.0和之後的版本支援consumer reject某條(類)訊息,可以通過設定requeue引數中的reject為true達到目地,那麼rabbitmq將會把訊息傳送給下乙個註冊的consumer。
先用 composer 載入 mq 拓展檔案
}
class rabbitmq extends commandprotected function execute(input $input, output $output)
elseif ($type=='pro')
echo 'mq test end' .php_eol;
}// 消費
protected function consumption()
{//連線rabbitmq
$conn = new amqpstreamconnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['pwd']
);// 開啟乙個通道
$channel = $conn->channel();
// 對於正在繁忙的客戶端,沒得到回應之前,不向其傳送新訊息
$channel->basic_qos(null,1,null);
// 宣告乙個佇列 第三個引數為宣告佇列永續性
$channel->queue_declare($this
訊息佇列Rabbitmq
rabbitmq server rabbitmqctl reset rabbitmqctl stop rabbitmqctl stop rabbitmqctl list users rabbitmqctl list queues rabbitmqctl add user user name user...
訊息佇列RabbitMQ
這是乙個很嚴肅的問題。系統之間解除耦合,可以讓不同語言編寫的系統通訊互動 保證伺服器負載不會飆公升。高大上一點就是流量削峰。讓程式變成非同步,提高響應速度。把費時任務放到另乙個程序或執行緒去執行。redis實現 剛開始學習redis時,一看這個鍊錶不就是給佇列準備的嗎?所以,一心扎進去,要寫個佇列出...
RabbitMQ訊息佇列
訊息發布接收流程 接收訊息 工作模式 publish subscribe 發布訂閱模式 發布訂閱publish subscribe和工作模式work queues的區別 routing 路由模式 區別 topics 區別 header 宣告佇列 bean queue inform sms publi...