RabbitMQ官網教程5 topic

2021-07-28 04:47:26 字數 2431 閱讀 4362

前面的章節我們改進了日誌系統,利用direct交換機,可以選擇性的接收日誌。但是,這仍然有侷限,不能基於多個規則進行路由。

在日誌系統裡,我們可能不只根據安全級別訂閱日誌,還可能想根據日誌源來訂閱。就像unix 工具syslog,根據安全級別及裝置來路由日誌。這樣就比較靈活了。

為了實現這種效果,我們需要學習topic交換機。

topic交換機

傳送到topic交換機的訊息,其routing_key不能是任意值,必須是由」 . 「限定的一組詞。routing_key中可以有很多詞,但長度不能超過255位元組。

binding key也是相同的形式。topic交換的邏輯與direct交換機類似,特定routing_key的訊息被投遞到所有用匹配的bindingkey進行繫結的佇列。對於binding key有兩個重要的特別之處:

* 表示乙個詞

# 表示0個或多個詞

例子:是否投遞到佇列

訊息的routing key

q1 binding key = *.orange.*

q2 binding key = *.*.rabbit

lazy.#

quick.orange.rabbit 是

是lazy.orange.elephant 是

是quick.orange.fox 是

否lazy.brown.fox 否

是lazy.pink.rabbit 否

是(匹配兩個繫結,也只投遞一次)

quick.brown.fox 否

否orange 否

否quick.orange.male.rabbit 否

否lazy.orange.male.rabbit 否

是 topic交換機很強大,可以像其他交換機一樣工作。如果乙個佇列用」#」繫結,它將接收所有訊息,而不管routing key,就像fanout交換機。如果繫結中沒有用*和#,topic交換機就跟direct交換機一樣。

import pika

import sys

connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'hello world!'

channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)

print(" [x] sent %r:%r" % (routing_key, message))

connection.close()

import pika

import sys

connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', type='topic')

result = channel.queue_declare(exclusive=true)

queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:

sys.stderr.write("usage: %s [binding_key]...\n" % sys.argv[0])

sys.exit(1)

for binding_key in binding_keys:

channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] waiting for logs. to exit press ctrl+c')

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=true)

channel.start_consuming()

RabbitMQ官網教程2 工作佇列

第一節中我們實現了簡單的傳送接收訊息。現在我們建立乙個工作佇列,用於在多個worker中分配耗時任務。工作佇列是為了避免立即執行資源密集型任務並等待其完成,有了工作佇列,就可以稍後再處理任務。我們把任務封裝成訊息併發送到佇列,後台的工作程序從佇列中取出任務並執行。當有多個工作程序時,任務會在它們中分...

RabbitMQ官網教程4 路由

前面的章節我們建立了乙個簡單的日誌系統,可以把訊息廣播到許多接收者。本節我們將增加乙個特性 只訂閱一部分訊息。比如只把錯誤日誌輸出到檔案,同時把所有日誌輸出到螢幕。繫結 前面的例子裡我們已經建立過繫結。繫結就是交換機和佇列間的一種關係,簡單解讀為佇列關注該交換機的訊息。建立繫結時可以增加乙個引數ro...

官網指令碼快速安裝rabbitmq

最近有安裝一次rabbitmq公升級到3.8.5版本,這個安裝真的是省事簡單 直接用官方指令碼執行即可 啟動管理平台 sudo rabbitmq plugins enable rabbitmq management 啟停命令 sudo rabbitmq server start sudo rabbi...