rabbitMQ工作佇列

2021-08-29 18:02:40 字數 3453 閱讀 2989

簡介:傳送耗時的任務給多個工作者,直到任務完成,返回給mq資訊,mq刪除佇列中的訊息。如果沒有收到返回資訊,就斷掉了,mq重新傳送該條資訊

$data = implode(' ', array_slice($argv, 1));

if(empty($data)) $data = "hello world!";

$msg = new amqpmessage($data,

array('delivery_mode' => amqpmessage::delivery_mode_persistent) # 使訊息持久化

);$channel->basic_publish($msg, '', 'task_queue');

echo " [x] sent ", $data, "\n";

$callback = function($msg);

$channel->basic_qos(null, 1, null);

$channel->basic_consume('task_queue', '', false, true, false, false, $callback);

迴圈排程:使用工作佇列的好處是,可以並行的處理佇列,如果又很多任務,新增工作者就好了,擴充套件簡單。

mq會按順序將訊息發給每個消費者,平均每個消費者收到的訊息數量一樣,這種傳送訊息的方式,叫做輪詢。

訊息確認:當處理一些比較耗時的任務時,需要知道客戶端是否執行到一半就掛掉了。為了防止訊息丟失,消費者返回服務端乙個ack響應,告訴服務端,已經處理好某條訊息,可以在佇列中刪除這條訊息了。

訊息響應預設是開啟的。之前的例子中我們可以使用no_ack=true標識把它關閉。是時候設定的第四個引數basic_consume為false (true 意味著不響應ack) ,當工作者(worker)完成了任務,就傳送乙個響應。

$callback = function($msg);

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

執行上面的**,我們發現即使使用ctrl+c殺掉了乙個工作者(worker)程序,訊息也不會丟失。當工作者(worker)掛掉這後,所有沒有響應的訊息都會重新傳送。

乙個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。訊息在你的程式退出之後就會重新傳送,如果它不能夠釋放沒響應的訊息,rabbitmq就會占用越來越多的記憶體。

訊息持久化:

為了不讓佇列消失,需要把佇列宣告為持久化(durable)。為此我們通過queue_declare的第三引數為true:

$channel->queue_declare('hello', false, true, false, false);
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的**中修改。

這時候,我們就可以確保在rabbitmq重啟之後queue_declare佇列不會丟失。另外,我們需要把我們的訊息也要設為持久化——設定delivery_mode = 2。

$msg = new amqpmessage($data,

array('delivery_mode' => amqpmessage::delivery_mode_persistent)

);

公平排程:

我們可以使用basic.qos方法,並設定prefetch_count=1。這樣是告訴rabbitmq,再同一時刻,不要傳送超過1條訊息給乙個工作者(worker),直到它已經處理了上一條訊息並且作出了響應。這樣,rabbitmq就會把訊息分發給下乙個空閒的工作者(worker)。

$channel->basic_qos(null, 1, null);
注:

rabbitmq不允許你使用不同的引數重新定義乙個佇列,它會返回乙個錯誤。如果佇列名已經建立了,不允許重新定義它。

總結:task.php

<?php

require_once __dir__ . '/vendor/autoload.php';

use phpamqplib\connection\amqpstreamconnection;

use phpamqplib\message\amqpmessage;

$connection = new amqpstreamconnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));

if(empty($data)) $data = "hello world!";

$msg = new amqpmessage($data,

array('delivery_mode' => amqpmessage::delivery_mode_persistent)

);$channel->basic_publish($msg, '', 'task_queue');

echo " [x] sent ", $data, "\n";

$channel->close();

$connection->close();

worker.php

<?php

require_once __dir__ . '/vendor/autoload.php';

use phpamqplib\connection\amqpstreamconnection;

$connection = new amqpstreamconnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] waiting for messages. to exit press ctrl+c', "\n";

$callback = function($msg);

$channel->basic_qos(null, 1, null);

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks))

$channel->close();

$connection->close();

RabbitMQ 工作佇列

rabbitmq是訊息 它接收資訊和 資訊。你可以把他考慮成乙個郵局。當你講郵寄的信放在郵局時,你可以確定郵差先生或者女士會把郵件最終送到你的收件人手中。當然郵局和rabbitmq最大的區別,rabbitmq不接受紙張,它只接收,儲存,二進位制的資料訊息快。下面講一些rabbitmq中的術語 注意 ...

RabbitMQ工作佇列

工作佇列也叫任務佇列,主要思想就是避免立即執行資源密集型任務,必須等待完成,才能繼續下乙個任務,你可以執行多個工人,佇列裡的工作他們可以共同不重複的完成。1 佇列優點之一就是能夠輕鬆平行的工作。如果積壓工作,我們可以增加更多的工人。預設情況下,rabbitmq將按順序將每條訊息傳送給下乙個工作者。平...

RabbitMQ的工作佇列和路由

工作佇列 working queue 工作佇列這個概念與簡單的傳送 接收訊息的區別就是 接收方接收到訊息後,可能需要花費更長的時間來處理訊息,這個過程就叫乙個work task。幾個概念 分配 多個接收端接收同乙個queue時,如何分配?訊息確認 server端如何確定接收方的work已經對訊息進行...