原始碼:
在之前的教程中,我們構建了乙個簡單的日誌系統 我們能夠將日誌訊息廣播給許多接收者。
在本教程中,我們將新增乙個功能 - 我們將只能訂閱一部分訊息。例如,我們只能將重要的錯誤訊息引導到日誌檔案(以節省磁碟空間),同時仍然能夠在控制台上列印所有日誌訊息。
在前面的例子中,我們已經建立了繫結。您可能會回想一下**:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
繫結是交換和佇列之間的關係。這可以簡單地理解為: the queue is interested in messages from this exchange.
繫結可以使用額外的routing_key引數。為了避免與basic_publish引數混淆,我們將其稱為繫結鍵。這就是我們如何使用乙個鍵建立乙個繫結:
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
繫結鍵的含義取決於交換型別。我們之前使用的 fanout 交換簡單地忽略了它的價值。
我們之前教程的日誌記錄系統將所有訊息廣播給所有消費者。我們希望將其擴充套件為允許根據其進行嚴格的過濾訊息。
例如,我們可能希望將嚴重錯誤的日誌訊息寫入磁碟,而不會寫入警告或資訊日誌訊息。
我們將使用direct交換。direct交換背後的路由演算法很簡單 - 訊息進入佇列,其繫結金鑰與訊息的路由金鑰完全匹配。
為了說明這一點,請考慮以下設定:
在這個設定中,我們可以看到有兩個佇列繫結的直接交換機x. 第乙個佇列用繫結鍵orange繫結,第二個佇列有兩個繫結,乙個繫結鍵為black,另乙個為green。
在這種設定中,使用路由鍵orange發布到交換機的訊息 將被路由到佇列q1。帶有black或gree路由鍵的訊息將進入q2。所有其他訊息將被丟棄。
使用相同的繫結金鑰繫結多個佇列是完全合法的。在我們的例子中,我們可以使用繫結鍵black新增x和q1之間的繫結。
在這種情況下,direct交換就像fanout一樣,並將訊息廣播到所有匹配的佇列。帶有路由鍵black的訊息將傳送到q1和q2。
我們將使用這個模型用於我們的日誌系統。取而代之的fanout,我們將訊息傳送到direct交換。我們將提供嚴格的日誌作為路由鍵(routing key)。
這樣接收指令碼將能夠選擇想要接收的訊息。我們先關注發出日誌的實現。
像往常一樣,我們需要首先建立乙個交換:
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
我們準備傳送一條訊息:
channel.basic_publish(exchange='direct_logs',
routing_key='',
body=message)
為了簡化事情,我們將假設「severity」可以是'info','warning','error'之一。
接收郵件的方式與上乙個教程中的一樣,只有乙個例外 - 我們將為每個我們感興趣的嚴重程度建立乙個新繫結。
emit_log_direct.py的**:
#!/usr/bin/env python
import sys
import pika
connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
severity = sys.args[1:] if len(sys.ar**) > 2 else 'info'
message = ' '.join(sys.ar**[2:]) or 'hello world!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity, body=message)
print(" [x] sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py的**:
#!/usr/bin/env python
import sys
import pika
connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
result = channel.queue_declare(exclusive=true)
queue_name = result.method.queue
severities = sys.ar**[1:]
if not severities:
sys.stderr.write("usage: %s [info] [warning] [error]\n" % sys.ar**[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print(' [*] waiting for logs. to exit press ctrl+c')
def callback(cb, method, properities, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=true)
channel.start_consuming()
如果只想儲存'warning'和'error'(而不是'info')將訊息記錄到檔案中,只需開啟乙個控制台並輸入:
python receive_logs_direct.py warning error > logs_from_rabbit.log如果您希望在螢幕上看到所有日誌訊息,請開啟乙個新終端並執行以下操作:
python receive_logs_direct.py info warning error例如,要輸出error日誌訊息,只需輸入:
python emit_log_direct.py error "run. run. or it will explode."
Spring AOP中文教程
僅僅用配置檔案便可把程式的每一部分組裝起來。四個bean定義的次序並不重要。我們現在有了乙個advice,乙個包含了正規表示式pointcut的advisor,乙個主程式類和乙個配置好的介面,通過工廠ctx,這個介面返回自己本身實現的乙個引用。beanimpl和testbeforeadvice都是直...
Beautiful Soup 中文教程
beautiful soup 是乙個處理python html xml的模組,功能相當強勁,最近仔細的看了一下他的幫助文件,終於看明白了一些。準備好好研究一下,順便將beautiful soup的一些用法整理一下,放到這個wiki上面,那個文件確實不咋地。beautiful soup 中文教程 的官...
kafka中文教程
本網翻譯整理apache kafka,提供整理apache kafka的完整學習文件。資料流,如訊息傳遞系統 高效並實時 資料流安全地在分布式集群中複製儲存 kafka是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點,並已在成千上萬家公司執行。apach...