在之前的案例中我們建立了乙個工作佇列,這個工作佇列的實現思想就是乙個把每乙個任務平均分配給每乙個執行者,在這個篇文章我們會做一些不一樣的東西,把乙個訊息傳送給多個消費者,這種模式就被稱作"發布/訂閱".
為了說明這個模式,我們將要建立乙個簡單的日誌系統,乙個負責發布訊息,另外乙個負責接收列印他們.
在我們的日誌系統中,每乙個執行中的接收者副本將都會獲得訊息,這種方式可以讓我們在執行乙個接收者直接把訊息儲存在磁碟的同時,另外乙個消費者可以把訊息列印到螢幕上.
本質上,發布乙個日誌訊息將會廣播給所有的接收者
在之前的文章中,我們接受和傳送訊息都是通過乙個佇列來完成了,現在是時候引入rabbitmq的全部工作模型了.
讓我們快速回憶一下之前涉及到的模型
--生產者(發布者),是乙個負責傳送訊息的使用者應用程式.
--佇列,負責儲存訊息
--消費者(接收者),負責接收訊息的使用者程式.
rabbitmq的核心思想是生產者永遠不會直接把訊息傳送給佇列,事實上生產者甚至經常不知道乙個發出去的訊息是否可以有佇列去接收它.
相應的,生產者只能訊息傳送給交換機,交換機的工作機制非常簡單,一方面它從生產者那裡接收到訊息,另一方面它會把訊息傳送給相應的佇列上.交換機必須要知道怎麼處理接收到的訊息,它應該被放入乙個特殊的佇列嗎?它是否應該被放入多個佇列?或者它是否需要被忽略.
處理這工作的方式是通過交換機型別來實現的.
這裡有幾個可用的交換機型別:direct,topic,headers,fanout 我們將會關注最後乙個(fanout),讓我們建立乙個fanout的交換機,名字叫做'logs'
channel.exchangedeclare("這個fanout的交換機功能非常簡單(你也許已經從名字中猜到了他的方式),把接收到的訊息廣播給所有已知的佇列,這個這是我們的日誌系統需要的.logs
", "
fanout
");
列出rabbitmq已新增的交換機:
無命名的交換機:在之前的案例中我們對於交換機一無所知,但是仍然可以把訊息傳送到佇列上,這是因為我們使用的是乙個預設的互動機,名字為空(""),回顧一下我們之前傳送訊息的方式
var message =getmessage(args);第乙個引數就是交換機的名稱,空字串表示預設的無命名的交換機:訊息通過存在的routingkey被傳送到佇列上.var body =encoding.utf8.getbytes(message);
channel.basicpublish(exchange:
"",routingkey:
"hello
", basicproperties:
null
,body: body);
現在我們傳送命名的交換機代替:
var message =getmessage(args);在之前的案例中,我們使用的佇列是乙個指定了名字的佇列(記得hello 和task_queue 嗎),給乙個隊名命名是嚴格的,我們需要執行者連線的同樣的佇列來工作,當你想在生產者和消費者之間共享佇列的時候指定乙個佇列名是非常重要的.但是我們的日誌系統則不在此列,var body =encoding.utf8.getbytes(message);
channel.basicpublish(exchange:
"logs
", routingkey:
"", basicproperties:
null
, body: body);
我們想要監聽到所有的日誌訊息,而不僅僅是他們的子集,我們也僅僅對當前正在流轉的訊息感興趣,而不是老的訊息,結局這個問題我們需要2件事情.
首先,無論何時我們連線到佇列,我們都需要乙個新鮮的,空的佇列,為了實現這個目標我們可以每次建立乙個隨機名稱的佇列,或者更加便捷的方式--讓服務為我們的佇列隨機命名.
第二,一旦我們斷開到消費者到佇列的連線,我們需要自動刪除佇列.
在.net客戶端,我們使用無參的queuedeclare()方法來建立乙個隨機命名的非持久的,自動刪除的排他佇列.
var queuename = channel.queuedeclare().queuename;queuename就是乙個隨機的佇列名,如:amq.gen-jzty20brgko-hjmujj0wlg.
我們已經建立了乙個fanout的交換機和乙個佇列,現在我們需要告訴我們交換機傳送訊息到我們的佇列,交換機和佇列之間的關係叫做繫結.
channel.queuebind(queue: queuename,exchange: "從現在開始logs 交換機將會把訊息放入我們的佇列當中.logs
", routingkey: "");
列出佇列cmd: rabbitmqctl list_bindings
負責傳送訊息的生產者可之前案例基本上是一樣的,最大的不同是我們將訊息傳送到了我們的命名佇列logs上而不是預設的佇列上,傳送的時候我們需要使用routingkey,但是它的值是被fanout交換機忽略的.
emitlog.cs
class正如你看到的,我們在建立連線之後建立了乙個佇列,這一步是必須的,因為傳送到乙個不存在的交換機是不被允許的。emitlog
;
using(var connection =factory.createconnection())
using(var channel =connection.createmodel())
", message);
}console.writeline(
"press [enter] to exit.");
console.readline();
}private
static
string getmessage(string
args)
}
當佇列還沒有繫結到交換機是傳送的訊息將會丟失,但是這對我們日誌系統來說沒有問題,當沒有消費者監聽時我們可以安全的忽略這個訊息。
receivelogs.cs:
class同時執行兩個receive,可以看到兩個接收端可以同時接收到乙個訊息。receivelogs
;
using(var connection =factory.createconnection())
using(var channel =connection.createmodel())
", message);
};channel.basicconsume(queue: queuename,
noack:
true
, consumer: consumer);
console.writeline(
"press [enter] to exit.");
console.readline();}}
}
RabbitMQ(三) 發布訂閱
rabbitmq 三 發布訂閱 一 概述 rabbitmq的發布訂閱 publish subscribe 其將生產者和消費者進一步解耦,生產者生產訊息後,交付給交換機,消費者上線後,主動主動去佇列中取資料進行處理。該模式也符合上一節工作佇列中的ack 預取等規則。發布訂閱模式如下圖所示 二 交換機 ...
譯 Kotlin Native v0 2 發布啦
本文翻譯自官方部落格 我們很高興地宣布kotlin native v0.2 發布啦,這是 kotlin native 技術預覽版的一次功能更新和 bug 修復。這次更新增加了對協程和跨模組內聯函式的支援,以及整體上的問題修復和優化。這次更新包括了乙個演示如何使用 併發非阻塞io協同程式 的示例,乙個...
RabbitMQ 原文譯04 路由
在前一篇文章中我們構建了乙個簡單的日誌系統,我們可以向多個接受者廣播訊息。在這篇文章我,我們將要新增一些功能使得針對部分訊息的接受成為可能,例如我們只對錯誤的訊息進行磁碟記錄,同時又可以把所有的訊息列印到螢幕上。在之前的案例中,我們已經建立了乙個繫結,可以重新呼叫如下的 channel.queueb...