在前面的教程中,我們實現了乙個簡單的日誌系統。可以把日誌訊息廣播給多個接收者。
本篇教程中我們打算新增乙個功能 —— 使得它能夠只訂閱訊息的乙個字集。例如,我們只需要把嚴重的錯誤日誌資訊寫入日誌檔案(儲存到磁碟),但同時仍然把所有的日誌資訊輸出到控制台中
前面的例子,我們已經建立過繫結(bindings),**如下:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
繫結(binding)是指交換機(exchange)和佇列(queue)的關係。可以簡單理解為:這個佇列(queue)對這個交換機(exchange)的訊息感興趣。
繫結的時候可以帶上乙個額外的routing_key引數。為了避免與basic_publish的引數混淆,我們把它叫做繫結鍵(binding key)。以下是如何建立乙個帶繫結鍵的繫結。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
繫結鍵的意義取決於交換機(exchange)的型別。我們之前使用過的扇型交換機(fanout exchanges)會忽略這個值。
我們的日誌系統廣播所有的訊息給所有的消費者(consumers)。我們打算擴充套件它,使其基於日誌的嚴重程度進行訊息過濾。例如我們也許只是希望將比較嚴重的錯誤(error)日誌寫入磁碟,以免在警告(warning)或者資訊(info)日誌上浪費磁碟空間。
我們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能做的僅僅是廣播。
我們將會使用直連交換機(direct exchange)來代替。路由的演算法很簡單 —— 交換機將會對繫結鍵(binding key)和路由鍵(routing key)進行精確匹配,從而確定訊息該分發到哪個佇列。
下圖能夠很好的描述這個場景:
在這個場景中,我們可以看到直連交換機 x和兩個佇列進行了繫結。第乙個佇列使用orange作為繫結鍵,第二個佇列有兩個繫結,乙個使用black作為繫結鍵,另外乙個使用green。
這樣以來,當路由鍵為orange的訊息發布到交換機,就會被路由到佇列q1。路由鍵為black或者green的訊息就會路由到q2。其他的所有訊息都將會被丟棄。
多個佇列使用相同的繫結鍵是合法的。這個例子中,我們可以新增乙個x和q1之間的繫結,使用black繫結鍵。這樣一來,直連交換機就和扇型交換機的行為一樣,會將訊息廣播到所有匹配的佇列。帶有black路由鍵的訊息會同時傳送到q1和q2。
我們將會傳送訊息到乙個直連交換機,把日誌級別作為路由鍵。這樣接收日誌的指令碼就可以根據嚴重級別來選擇它想要處理的日誌。我們先看看傳送日誌。
我們需要建立乙個交換機(exchange):
channel.exchange_declare(exchange='direct_logs',
type='direct')
然後我們傳送一則訊息:
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
我們先假設「severity」的值是info、warning、error中的乙個。
處理接收訊息的方式和之前差不多,只有乙個例外,我們將會為我們感興趣的每個嚴重級別分別建立乙個新的繫結。
emit_log_direct.py的**:
#!/usr/bin/env python
import pika
import sys
connection = pika.blockingconnection(pika.connectionparameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1
else
'info'
message = ' '.join(sys.argv[2:]) or
'hello world!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print
" [x] sent %r:%r" % (severity, message)
connection.close()
receive_logs_direct.py的**:
#!/usr/bin/env python
import pika
import sys
connection = pika.blockingconnection(pika.connectionparameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=true)
queue_name = result.method.queue
severities = sys.argv[1:]
ifnot severities:
print >> sys.stderr, "usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print
' [*] waiting for logs. to exit press ctrl+c'
defcallback
(ch, method, properties, body):
print
" [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=true)
channel.start_consuming()
如果你希望只是儲存warning和error級別的日誌到磁碟,只需要開啟控制台並輸入:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你希望所有的日誌資訊都輸出到螢幕中,開啟乙個新的終端,然後輸入:
$ python receive_logs_direct.py info warning error
[*] waiting for logs. to exit press ctrl+c
如果要觸發乙個error級別的日誌,只需要輸入:
$ python emit_log_direct.py error "run. run. or it will explode."
[x] sent 'error':'run. run. or it will explode.'
RabbitMQ路由模式
模型 路由模式與發布訂閱模式非常相似,但是路由模式增加了路由鍵的配置,生產者在傳送訊息到交換機的時候會指定乙個routingkey,用於匹配佇列,同時佇列繫結到交換機時也會指定routingkey,交換機在向佇列傳送訊息時,會根據routingkey來匹配。生產者public class provi...
ISIS路由協議理論
clns 無連線網路服務 1 clns相當於ip協議簇 2 clnp 無連線網路協議 相當於ip協議,clnp位址相當於ip位址。3 es endpoint system 相當於pc和伺服器 is internal system 相當於路由器,就是三層裝置。4 es is就是指在es和is之間需要建...
rabbitmq 理論 主題交換機
儘管直連交換機能夠改善我們的系統,但是它也有它的限制 沒辦法基於多個標準執行路由操作。在我們的日誌系統中,我們不只希望訂閱基於嚴重程度的日誌,同時還希望訂閱基於傳送 的日誌。unix工具syslog就是同時基於嚴重程度 severity info warn crit.和 裝置 facility au...