參考:
rabbitmq是乙個訊息中介軟體(message broker),它接受和**訊息。類似郵局的功能。
使用的術語:
procucer-傳送訊息的就是生產者。
queue-訊息儲存在佇列中,佇列就是乙個大型的訊息快取。多個生產者可以將訊息傳送給乙個佇列,多個消費者可以嘗試從乙個佇列接受資料。
consume-消費者是等待接受訊息的程式
note:producer,consumer,broker在絕大部分應用中,分布在不同的主機上。
生產者向佇列『hello』傳送訊息,生產者從佇列接受訊息。
中間的box是queue--維持在消費者一端的訊息快取。
import pika#向乙個指定位址上的broker創立連線
conncetion = pika.blockingconnection(pika.connectionparameters('localhost'))
channel = conncetion.channel()
#傳送訊息之前需要確認接收端佇列存在
#如果傳送資料到乙個不存在的位置,rabbitmq會丟掉訊息
#在這裡我們宣告乙個hello佇列來投遞訊息
channel.queue_declare('hello')
#現在傳送乙個訊息到hello這個佇列。
#在rabbitmq中,乙個訊息無法直接傳送帶佇列,而是需要通過乙個exchange。
#目前只需要知道如何使用乙個預設的由乙個空字串認證的exchange,它允許指定訊息要發到哪個佇列
#佇列名在route-key引數中指定
channel.basic_publish(exchange='',routing_key='hello',body='hello world')
#退出程式之前需要確認網路快取被清空,並且訊息被投遞到rabbitmq。可以優雅地關閉連線
conncetion.close()
import pika當消費者啟動,就會開始等待從hello佇列接受訊息,並且迴圈接受。每當佇列中產生資料,就會到達消費者。#連線到rabbitmq server
conncetion = pika.blockingconnection(pika.connectionparameters('localhost'))
channel = conncetion.channel()
#確認接收端佇列存在
channel.queue_declare('hello')
#從佇列接受訊息較為複雜,它通過向佇列訂閱乙個**函式來工作
#每當我們接受乙個訊息,pika庫會呼叫**函式
#在這個case中,函式會列印訊息內容
def callback(ch, method, properties, body):
print("received %r" % body)
#告知rabbitmq,指定的**函式要從hello佇列中接受訊息
#如果訂閱的queue不存在,這一步會失敗
channel.basic_consume(callback,queue='hello',no_ack=true)
#這個函式會一直等待資料,並在需要時執行**函式
print('waiting for maeeage')
channel.start_consuming()
如果從同乙個佇列獲取資料的消費者有多個,那就會預設使用輪訓機制獲取資料。
在應用場景中,生產者相當於客戶端,消費者相當於服務端。生產者使用basix_publish函式傳送資料到佇列,然後消費者使用basic_consume從佇列中獲取資料,並呼叫**函式對資料進行處理。
在流程上,生產者傳送訊息到佇列,然後消費者使用**函式處理完後,預設情況下,在basic_consume函式中,no_ack=false,就是會向生產者傳送清楚處理完成確認訊息。生產者收到該訊息,就會刪除佇列中的訊息。
如果消費者在處理過程中宕機,rabbitmq檢測到sokcet連線斷了,就會把訊息發到下乙個輪訓點。
在第二大點的案例中,加上了no_ack=true,就是消費者不會向生產者傳送處理完確認資訊。這樣,如果消費者在處理過程中宕機,生產者也會刪除佇列中的訊息。這適合於不看重處理結果的請求。
當rabbitmq服務宕機,佇列資訊就會丟失,在宣告佇列時加上持久化引數,會把佇列儲存(裡面的訊息依然不會儲存)
channel.queue_declare('hello',durable=true)會把訊息也持久化,需要在生產者的basic_publish函式裡加乙個引數
channel.basic_publish(exchange='',routing_key='hello',body='hello world',在rabbitmq的訊息模型中,核心特徵就是,生產者永遠不會直接向佇列傳送任何訊息,甚至大部分時候生產者都不知道乙個訊息是否會被投遞到佇列中。properties=pika.basicproperties(delivery_mode=2)
)
生產者只能把訊息發給exchange。exchange一面從生產者接受訊息,另一面把訊息推送給佇列。exchange必須準確知道該如何處理所收到的訊息。exchange type就是來定義處理方式的規則。
在第二大點,exchange='',這是預設的exchange,會按照route_key裡的queue名去傳送訊息(如果該queue存在的話)
fanout exchange很簡單,就是把自己從生產者收到的所有訊息廣播給它繫結的所有佇列。
import pikamessage='i am jabbok'
connect = pika.blockingconnection(pika.connectionparameters('localhost'))
channel = connect.channel()
#宣告乙個fanout的exchange型別,取名『logs』
#exchange會向所有與自己繫結的queue廣播自己收到的訊息
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#當exchange='',會根據route_key的值去查詢佇列
#而fanout型別的exchange,是向所有繫結的佇列傳送訊息。所以route_key=''
channel.basic_publish(exchange='logs',routing_key='',body=message)
channel.close()
import pika,time消費發布後,如果對端沒有訂閱者,佇列馬上刪除。這時再執行訂閱,也收不到訊息。conncetion = pika.blockingconnection(pika.connectionparameters('localhost'))
channel = conncetion.channel()
channel.exchange_declare(exchange='logs',exchange_type='fanout')
#在本例中,只需要當前的訊息,所以每次連線佇列都會清空
#為此建立的佇列使用的是隨機名,所以佇列宣告裡不需要queue引數
#加上exclusive唯一引數,與該隨機名queue連線的生產者斷開連線,該queue刪除
#result.method.queue包含乙個隨機的佇列名
result = channel.queue_declare(exclusive=true)
queue_name = result.method.queue
#繫結exchange和佇列,就是告訴exchange把訊息發給哪個queue
#這裡queue是個隨機名
channel.queue_bind(exchange='logs',queue=queue_name)
def callback(ch, method, properties, body):
print("received %r" % body)
channel.basic_consume(callback,queue=queue_name,no_ack=true)
print('waiting for maeeage')
channel.start_consuming()
在第二大點,同乙個訊息,消費者是輪訓接受。但在訂閱發布模型中,所有訂閱者都會收到同乙個訊息。
訊息中介軟體
1.訊息的優先順序 2.訊息排序 3.訊息過濾 4.訊息持久化 5.訊息重試 6.事務的支援 7.broker滿 生產者,佇列,消費者 訊息佇列的優點 1 解耦2 非同步訊息,系統響應 在jms中,有兩種訊息模型 點對點模式和發布訂閱模式。1.在點對點模式中 有三種角色 1 訊息佇列,傳送者,接受者...
訊息中介軟體
如何理解訊息中介軟體?訊息中介軟體是儲存訊息的乙個容器,與資料庫不同的是資料庫儲存的資料是可以被修改的,而訊息中介軟體一般不會被修改 訊息中介軟體在消費的生產者與消費者產生,相當於乙個中間人的角色,提供了路由保證訊息的傳遞,如果消費者不能及時接收,訊息會保留下來,知道消費者上線 保證在存活期內 訊息...
訊息中介軟體
訊息中介軟體是在訊息的傳輸過程中儲存訊息 訊息傳遞過程中不能更改 的容器。訊息中介軟體再將訊息從它的原中繼到它的目標時充當中間人的作用。訊息中介軟體的主要目的是提供路由並保證訊息的傳遞 如果傳送訊息時接收者不可用,訊息佇列會保留訊息,知道可以成功傳遞為止,當然,訊息佇列儲存訊息也是有期限的。訊息傳送...