工作佇列:working queue
工作佇列這個概念與簡單的傳送/接收訊息的區別就是:接收方接收到訊息後,可能需要花費更長的時間來處理訊息,這個過程就叫乙個work/task。
幾個概念
分配:多個接收端接收同乙個queue時,如何分配?
訊息確認:server端如何確定接收方的work已經對訊息進行了完整的處理?
訊息持久化:傳送方、服務端queue如何對未處理的訊息進行磁碟持久化?
round-robin分配
多個接收端接收同乙個queue時,採用了round-robin分配演算法,即輪叫排程——依次分配給各個接收方。
訊息確認
對於耗時的work,可以先關閉自動訊息確認,在work完成後,再手動發回確認。
channel.basicconsume("hello",false/*關閉自動訊息確認*/,consumer);
// ...work完成後
channel.basicack(delivery.getenvelope().getdeliverytag(), false);
持久化
1. server端的queue持久化
注意的是,如果已經宣告了同名非持久化的queue,則再次宣告無效。
傳送方和接收方都需要指定該引數。
boolean durable = true;
channel.queuedeclare("task_queue", durable, false, false, null);
2. message持久化
channel.basicpublish("", "task_queue", messageproperties.
persistent_text_plain,message.getbytes());
負載分配
為了解決各個接收端工作量相差太大的問題(有的一直busy,有的空閒比較多),突破round-robin。
int prefetchcount = 1;
channel.basicqos(prefetchcount);
意思為,最多為當前接收方傳送一條訊息。如果接收方還未處理完畢訊息,還沒有回發確認,就不要再給他分配訊息了,應該把當前訊息分配給其它空閒接收方。
場景示例:訊息傳送方傳送了型別為[error][info]的兩種訊息,寫磁碟的訊息接受者只接受error型別的訊息,console列印的接收兩者。
(上圖採用了不同顏色來作為routingkey)
傳送方
connectionfactory factory = new接收方connectionfactory();
factory.sethost("localhost");
connection connection =factory.newconnection();
channel channel =connection.createchannel();
channel.exchangedeclare(exchange_name, "direct"/*
exchange型別為direct
*/);
channel.basicpublish(exchange_name, "info"/*
*/, null
, message.getbytes());
channel.close();
connection.close();
connectionfactory factory = new* 表示乙個word;connectionfactory();
factory.sethost("localhost");
connection connection =factory.newconnection();
channel channel =connection.createchannel();
channel.exchangedeclare(exchange_name, "direct"/*
exchange型別為direct
*/);
//建立匿名queue
string queuename =channel.queuedeclare().getqueue();
"error");
channel.queuebind(quuename,exchange_name,"info");
queueingconsumer consumer = new
queueingconsumer(channel);
channel.basicconsume(queuename,
true
, consumer);
queueingconsumer.delivery delivery = consumer.nextdelivery(); //
blocking...
string message = new
string(delivery.getbody());
string routingkey = delivery.getenvelope().getroutingkey(); //
# 表示0個或者多個word;
傳送方
connectionfactory factory = new接收方connectionfactory();
factory.sethost("localhost");
connection connection =factory.newconnection();
channel channel =connection.createchannel();
channel.exchangedeclare(exchange_name, "topic"/*
exchange型別
*/);
channel.basicpublish(exchange_name, "***.yyy"/*
*/, null
, message.getbytes());
channel.close();
connection.close();
connectionfactory factory = newrefsconnectionfactory();
factory.sethost("localhost");
connection connection =factory.newconnection();
channel channel =connection.createchannel();
channel.exchangedeclare(exchange_name, "topic"/*
exchange型別
*/);
//建立匿名queue
string queuename =channel.queuedeclare().getqueue();
"*.yyy");
queueingconsumer consumer = new
queueingconsumer(channel);
channel.basicconsume(queuename,
true
, consumer);
queueingconsumer.delivery delivery = consumer.nextdelivery(); //
blocking...
string message = new
string(delivery.getbody());
string routingkey = delivery.getenvelope().getroutingkey(); //
rabbitMQ工作佇列
簡介 傳送耗時的任務給多個工作者,直到任務完成,返回給mq資訊,mq刪除佇列中的訊息。如果沒有收到返回資訊,就斷掉了,mq重新傳送該條資訊 data implode array slice argv,1 if empty data data hello world msg new amqpmessa...
RabbitMQ 工作佇列
rabbitmq是訊息 它接收資訊和 資訊。你可以把他考慮成乙個郵局。當你講郵寄的信放在郵局時,你可以確定郵差先生或者女士會把郵件最終送到你的收件人手中。當然郵局和rabbitmq最大的區別,rabbitmq不接受紙張,它只接收,儲存,二進位制的資料訊息快。下面講一些rabbitmq中的術語 注意 ...
RabbitMQ工作佇列
工作佇列也叫任務佇列,主要思想就是避免立即執行資源密集型任務,必須等待完成,才能繼續下乙個任務,你可以執行多個工人,佇列裡的工作他們可以共同不重複的完成。1 佇列優點之一就是能夠輕鬆平行的工作。如果積壓工作,我們可以增加更多的工人。預設情況下,rabbitmq將按順序將每條訊息傳送給下乙個工作者。平...