rabbitmq訊息模型的核心理念是:發布者(producer)不會直接傳送任何訊息給佇列。事實上,發布者(producer)甚至不知道訊息是否已經被投遞到佇列。
發布者(producer)只需要把訊息傳送給乙個交換機(exchange)。交換機非常簡單,它一邊從發布者方接收訊息,一邊把訊息推送到佇列。交換機必須知道如何處理它接收到的訊息,是應該推送到指定的佇列還是是多個佇列,或者是直接忽略訊息。這些規則是通過交換機型別(exchange type)來定義的。
有幾個可供選擇的交換機型別:直連交換機(direct), 主題交換機(topic), (頭交換機)headers和 扇型交換機(fanout)。我們在這裡主要說明最後乙個 —— 扇型交換機(fanout)。先建立乙個fanout型別的交換機,命名為logs:
扇型交換機(fanout)很簡單,你可能從名字上就能猜測出來,它把訊息傳送給它所知道的所有佇列。
'''
訊息發布者
利用fanout扇型交換機,實現網路多播功能
只要訂閱者訂閱了目前交換機,都能收到發布的訊息
'''import pika
exchangename = "test01"
if __name__ == '__main__':
connection = pika.blockingconnection(pika.connectionparameters(host="192.168.1.116"))
channel = connection.channel()
#定義乙個交換機
channel.exchange_declare(exchange=exchangename, #交換機名字
exchange_type = "fanout", #交換機型別
)channel.basic_publish(exchange=exchangename,
routing_key="", #不指定佇列名字,意思就是發布到所有繫結在該台交換機佇列
body="begin test")
connection.close()
'''
訊息訂閱者
利用fanout扇型交換機,實現網路多播功能
只要訂閱者訂閱了目前交換機,都能收到發布的訊息
佇列名字已經沒有多少用處
'''import pika
exchangename = "test01"
if __name__ == '__main__':
connection = pika.blockingconnection(pika.connectionparameters(host="192.168.1.116"))
channel = connection.channel()
#定義交換機
channel.exchange_declare(exchange=exchangename,
type="fanout")
#定義乙個隨機名字佇列
result = channel.queue_declare(exclusive=true) #系統隨機生成佇列名字,並返回
queue_name = result.method.queue
#繫結佇列到交換器上
channel.queue_bind(exchange=exchangename,
queue=queue_name)
def callback(ch,method,properties,body):
print(body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=false)
channel.start_consuming()
RabbitMQ 廣播訊息
定義 廣播訊息是指生產者產生的訊息將分發給所有訂閱這個訊息的消費者,而普通的模式是 一批訊息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模組就是共同消費模組 而今天說的是廣播訊息,它是指一些訊息同時被推送到多個訂閱者,而這些訂閱者收...
RabbitMQ 廣播訊息
定義 廣播訊息是指生產者產生的訊息將分發給所有訂閱這個訊息的消費者,而普通的模式是 一批訊息可以被多個人共同消費,如consumer1可能消費1,3,5記錄,而consumer2可能消費的是2,4,6這種模組就是共同消費模組 而今天說的是廣播訊息,它是指一些訊息同時被推送到多個訂閱者,而這些訂閱者收...
rabbitmq 廣播方式
通過休息佇列控制使用者上線下線 建立交換機 bean name oninechange public fanoutexchange onlineexchange 建立成功 fanout exchange name return new fanoutexchange fanout exchange n...