rabbitmq訊息傳遞模型的核心思想是生產者從不將任何訊息直接傳送到佇列。實際上,生產者經常甚至根本不知道是否將訊息傳遞到任何佇列。
send.py
import pika
connection = pika.blockingconnection(
pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='hello world!')
print(" [x] sent 'hello world!'")
connection.close()
recive.py
import pika, sys, os
def main():
connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=true)
print(' [*] waiting for messages. to exit press ctrl+c')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except keyboardinterrupt:
print('interrupted')
try:
sys.exit(0)
except systemexit:
os._exit(0)
producer exchange、routing_key queue consumer
producer 將訊息發往exchange
預設使用的是direct型別,exchange為空,根據routing_key找到queue或者建立queue
channel.basic_publish(exchange='', routing_key='hello', body='hello world!')
生產者只能將訊息傳送到exchange,交流是一件非常簡單的事情。一方面,它接收來自生產者的訊息,另一方面,將它們推入佇列。交易所必須確切知道如何處理收到的訊息。是否應將其附加到特定佇列?是否應該將其附加到許多佇列中?還是應該丟棄它。規則由交換型別定義 。
有幾種交換型別可用:direct,topic,headers 和fanout。我們將集中討論最後乙個-fanout。讓我們建立該型別的交換,並將其稱為log:
channel.exchange_declare(exchange = 'logs',
exchange_type = 'fanout')
發布或者訂閱時指定routing_key
exchange 在direct型別時:routing_key 和exchange 訊息所屬佇列;
exchange 在topic 模式下時:
"#":匹配所有的路由鍵
"work.#":無匹配
exchangezai fanout型別時:
到達該exchange的所有訊息將會傳送到繫結到該exchange的所有佇列
訊息被否定確認,使用channel.basic_nack
或channel.basic_reject
,並且此時requeue
屬性被設定為false
。
訊息在佇列的存活時間超過設定的ttl時間。
訊息佇列的訊息數量已經超過最大佇列長度。
那麼該訊息將成為「死信」。
「死信」訊息會被rabbitmq進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。
死信訊息的生命週期:
業務訊息被投入業務佇列
消費者消費業務佇列的訊息,由於處理過程中發生異常,於是進行了nck或者reject操作
被nck或reject的訊息由rabbitmq投遞到死信交換機中
死信交換機將訊息投入相應的死信佇列
死信佇列的消費者消費死信訊息
延時佇列
,首先,它是一種佇列,佇列意味著內部的元素是有序
的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出。
其次,延時佇列
,最重要的特性就體現在它的延時
屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理
,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務。
簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列。
官網基礎
可靠訊息|死信佇列|延時佇列
RabbitMq死信佇列
死信交換機有什麼用呢?在建立佇列的時候 可以給這個佇列附帶乙個交換機,那麼這個佇列作廢的訊息就會被重新發到附帶的交換機,然後讓這個交換機重新路由這條訊息。通俗的說,就是訊息產生之後,因為設定了超時時間,在這段時間內訊息沒有被消費就會被扔到死信佇列裡面。交換機名稱 private static fin...
rabbitmq死信佇列
死信佇列 dlx dead letter exchange 利用dlx,當訊息在乙個佇列中變成死信 dead message 之後,它能重新publish到另外乙個exchange,這個exchange就是dxl 訊息變成死信的幾種情況 訊息被拒絕 basic.reject basic.nack 並...
rabbitmq死信佇列
概念 當訊息成為死信時,會將該訊息放到死信交換機當中,這個交換機也繫結的其他佇列,還可以繼續進行消費。訊息什麼時候會變成死信 在配置檔案宣告佇列時指定死信交換機的名稱和死信交換機的路由key key x dead letter exchange value 死信交換機名稱 key x dead le...