簡介:傳送耗時的任務給多個工作者,直到任務完成,返回給mq資訊,mq刪除佇列中的訊息。如果沒有收到返回資訊,就斷掉了,mq重新傳送該條資訊
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "hello world!";
$msg = new amqpmessage($data,
array('delivery_mode' => amqpmessage::delivery_mode_persistent) # 使訊息持久化
);$channel->basic_publish($msg, '', 'task_queue');
echo " [x] sent ", $data, "\n";
$callback = function($msg);
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, true, false, false, $callback);
迴圈排程:使用工作佇列的好處是,可以並行的處理佇列,如果又很多任務,新增工作者就好了,擴充套件簡單。
mq會按順序將訊息發給每個消費者,平均每個消費者收到的訊息數量一樣,這種傳送訊息的方式,叫做輪詢。
訊息確認:當處理一些比較耗時的任務時,需要知道客戶端是否執行到一半就掛掉了。為了防止訊息丟失,消費者返回服務端乙個ack響應,告訴服務端,已經處理好某條訊息,可以在佇列中刪除這條訊息了。
訊息響應預設是開啟的。之前的例子中我們可以使用no_ack=true標識把它關閉。是時候設定的第四個引數basic_consume為false (true 意味著不響應ack) ,當工作者(worker)完成了任務,就傳送乙個響應。
$callback = function($msg);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
執行上面的**,我們發現即使使用ctrl+c殺掉了乙個工作者(worker)程序,訊息也不會丟失。當工作者(worker)掛掉這後,所有沒有響應的訊息都會重新傳送。
乙個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。訊息在你的程式退出之後就會重新傳送,如果它不能夠釋放沒響應的訊息,rabbitmq就會占用越來越多的記憶體。
訊息持久化:
為了不讓佇列消失,需要把佇列宣告為持久化(durable)。為此我們通過queue_declare的第三引數為true:
$channel->queue_declare('hello', false, true, false, false);
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的**中修改。
這時候,我們就可以確保在rabbitmq重啟之後queue_declare佇列不會丟失。另外,我們需要把我們的訊息也要設為持久化——設定delivery_mode = 2。
$msg = new amqpmessage($data,
array('delivery_mode' => amqpmessage::delivery_mode_persistent)
);
公平排程:
我們可以使用basic.qos方法,並設定prefetch_count=1。這樣是告訴rabbitmq,再同一時刻,不要傳送超過1條訊息給乙個工作者(worker),直到它已經處理了上一條訊息並且作出了響應。這樣,rabbitmq就會把訊息分發給下乙個空閒的工作者(worker)。
$channel->basic_qos(null, 1, null);
注:
rabbitmq不允許你使用不同的引數重新定義乙個佇列,它會返回乙個錯誤。如果佇列名已經建立了,不允許重新定義它。
總結:task.php
<?php
require_once __dir__ . '/vendor/autoload.php';
use phpamqplib\connection\amqpstreamconnection;
use phpamqplib\message\amqpmessage;
$connection = new amqpstreamconnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "hello world!";
$msg = new amqpmessage($data,
array('delivery_mode' => amqpmessage::delivery_mode_persistent)
);$channel->basic_publish($msg, '', 'task_queue');
echo " [x] sent ", $data, "\n";
$channel->close();
$connection->close();
worker.php
<?php
require_once __dir__ . '/vendor/autoload.php';
use phpamqplib\connection\amqpstreamconnection;
$connection = new amqpstreamconnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] waiting for messages. to exit press ctrl+c', "\n";
$callback = function($msg);
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks))
$channel->close();
$connection->close();
RabbitMQ 工作佇列
rabbitmq是訊息 它接收資訊和 資訊。你可以把他考慮成乙個郵局。當你講郵寄的信放在郵局時,你可以確定郵差先生或者女士會把郵件最終送到你的收件人手中。當然郵局和rabbitmq最大的區別,rabbitmq不接受紙張,它只接收,儲存,二進位制的資料訊息快。下面講一些rabbitmq中的術語 注意 ...
RabbitMQ工作佇列
工作佇列也叫任務佇列,主要思想就是避免立即執行資源密集型任務,必須等待完成,才能繼續下乙個任務,你可以執行多個工人,佇列裡的工作他們可以共同不重複的完成。1 佇列優點之一就是能夠輕鬆平行的工作。如果積壓工作,我們可以增加更多的工人。預設情況下,rabbitmq將按順序將每條訊息傳送給下乙個工作者。平...
RabbitMQ的工作佇列和路由
工作佇列 working queue 工作佇列這個概念與簡單的傳送 接收訊息的區別就是 接收方接收到訊息後,可能需要花費更長的時間來處理訊息,這個過程就叫乙個work task。幾個概念 分配 多個接收端接收同乙個queue時,如何分配?訊息確認 server端如何確定接收方的work已經對訊息進行...