rabbitmq中文教程python版 路由

2021-09-13 22:16:24 字數 4164 閱讀 7005

原始碼:

在之前的教程中,我們構建了乙個簡單的日誌系統 我們能夠將日誌訊息廣播給許多接收者。

在本教程中,我們將新增乙個功能 - 我們將只能訂閱一部分訊息。例如,我們只能將重要的錯誤訊息引導到日誌檔案(以節省磁碟空間),同時仍然能夠在控制台上列印所有日誌訊息。

在前面的例子中,我們已經建立了繫結。您可能會回想一下**:

channel.queue_bind(exchange=exchange_name,

queue=queue_name)

繫結是交換和佇列之間的關係。這可以簡單地理解為: the queue is interested in messages from this exchange.

繫結可以使用額外的routing_key引數。為了避免與basic_publish引數混淆,我們將其稱為繫結鍵。這就是我們如何使用乙個鍵建立乙個繫結:

channel.queue_bind(exchange=exchange_name,

queue=queue_name,

routing_key='black')

繫結鍵的含義取決於交換型別。我們之前使用的 fanout 交換簡單地忽略了它的價值。

我們之前教程的日誌記錄系統將所有訊息廣播給所有消費者。我們希望將其擴充套件為允許根據其進行嚴格的過濾訊息。

例如,我們可能希望將嚴重錯誤的日誌訊息寫入磁碟,而不會寫入警告或資訊日誌訊息。

我們將使用direct交換。direct交換背後的路由演算法很簡單 - 訊息進入佇列,其繫結金鑰與訊息的路由金鑰完全匹配。

為了說明這一點,請考慮以下設定:

在這個設定中,我們可以看到有兩個佇列繫結的直接交換機x. 第乙個佇列用繫結鍵orange繫結,第二個佇列有兩個繫結,乙個繫結鍵為black,另乙個為green。

在這種設定中,使用路由鍵orange發布到交換機的訊息 將被路由到佇列q1。帶有black或gree路由鍵的訊息將進入q2。所有其他訊息將被丟棄。

使用相同的繫結金鑰繫結多個佇列是完全合法的。在我們的例子中,我們可以使用繫結鍵black新增x和q1之間的繫結。

在這種情況下,direct交換就像fanout一樣,並將訊息廣播到所有匹配的佇列。帶有路由鍵black的訊息將傳送到q1和q2。

我們將使用這個模型用於我們的日誌系統。取而代之的fanout,我們將訊息傳送到direct交換。我們將提供嚴格的日誌作為路由鍵(routing key)。

這樣接收指令碼將能夠選擇想要接收的訊息。我們先關注發出日誌的實現。

像往常一樣,我們需要首先建立乙個交換:

channel.exchange_declare(exchange='direct_logs',

exchange_type='direct')

我們準備傳送一條訊息:

channel.basic_publish(exchange='direct_logs',

routing_key='',

body=message)

為了簡化事情,我們將假設「severity」可以是'info','warning','error'之一。

接收郵件的方式與上乙個教程中的一樣,只有乙個例外 - 我們將為每個我們感興趣的嚴重程度建立乙個新繫結。

emit_log_direct.py的**:

#!/usr/bin/env python

import sys

import pika

connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',

exchange_type='direct')

severity = sys.args[1:] if len(sys.ar**) > 2 else 'info'

message = ' '.join(sys.ar**[2:]) or 'hello world!'

channel.basic_publish(exchange='direct_logs',

routing_key=severity, body=message)

print(" [x] sent %r:%r" % (severity, message))

connection.close()

receive_logs_direct.py的**:

#!/usr/bin/env python

import sys

import pika

connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',

exchange_type='direct')

result = channel.queue_declare(exclusive=true)

queue_name = result.method.queue

severities = sys.ar**[1:]

if not severities:

sys.stderr.write("usage: %s [info] [warning] [error]\n" % sys.ar**[0])

sys.exit(1)

for severity in severities:

channel.queue_bind(exchange='direct_logs',

queue=queue_name,

routing_key=severity)

print(' [*] waiting for logs. to exit press ctrl+c')

def callback(cb, method, properities, body):

print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,

queue=queue_name,

no_ack=true)

channel.start_consuming()

如果只想儲存'warning'和'error'(而不是'info')將訊息記錄到檔案中,只需開啟乙個控制台並輸入:

python receive_logs_direct.py warning error > logs_from_rabbit.log
如果您希望在螢幕上看到所有日誌訊息,請開啟乙個新終端並執行以下操作:

python receive_logs_direct.py info warning error
例如,要輸出error日誌訊息,只需輸入:

python emit_log_direct.py error "run. run. or it will explode."

Spring AOP中文教程

僅僅用配置檔案便可把程式的每一部分組裝起來。四個bean定義的次序並不重要。我們現在有了乙個advice,乙個包含了正規表示式pointcut的advisor,乙個主程式類和乙個配置好的介面,通過工廠ctx,這個介面返回自己本身實現的乙個引用。beanimpl和testbeforeadvice都是直...

Beautiful Soup 中文教程

beautiful soup 是乙個處理python html xml的模組,功能相當強勁,最近仔細的看了一下他的幫助文件,終於看明白了一些。準備好好研究一下,順便將beautiful soup的一些用法整理一下,放到這個wiki上面,那個文件確實不咋地。beautiful soup 中文教程 的官...

kafka中文教程

本網翻譯整理apache kafka,提供整理apache kafka的完整學習文件。資料流,如訊息傳遞系統 高效並實時 資料流安全地在分布式集群中複製儲存 kafka是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點,並已在成千上萬家公司執行。apach...