第一節中我們實現了簡單的傳送接收訊息。現在我們建立乙個工作佇列,用於在多個worker中分配耗時任務。工作佇列是為了避免立即執行資源密集型任務並等待其完成,有了工作佇列,就可以稍後再處理任務。我們把任務封裝成訊息併發送到佇列,後台的工作程序從佇列中取出任務並執行。當有多個工作程序時,任務會在它們中分發。
準備用字串模擬複雜任務,通過time.sleep()函式模擬程序繁忙,字串中的點表示任務複雜度。
修改上節的傳送程式,實現從命令列輸入訊息併發送到佇列。
import sys
message = ' '.join(sys.argv[1:]) or "hello world!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.basicproperties(
delivery_mode = 2, # make message persistent
))print(" [x] sent %r" % message)
接收程式中增加延時。
import time
def callback(ch, method, properties, body):
print(" [x] received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] done")
輪詢分發
使用任務佇列的乙個優點就是方便水平擴充套件。如果系統負載過大就可以增加新的工作程序即可。
預設情況下,rabbitmq會順序的將訊息發給下乙個消費者,這種分發方式就是round-robin。平均下來每個消費者會獲得等量的訊息。
訊息確認
我們不希望丟失任何訊息,如果乙個worker掛掉,我們希望把訊息傳送給另乙個worker。
為了保證訊息不丟失,rabbitmq支援訊息確認。消費者會傳送ack訊息確認給rabbitmq,通知它訊息已收到並處理,rabbitmq可以刪除它。
如果消費者在傳送ack前掛掉(通道關閉、連線關閉、tcp連線丟失),rabbitmq認為訊息未被完全處理,並將訊息重新入隊,如果存在其他消費者,就將訊息重新投遞到其他消費者。這樣就保證訊息不丟失。
這裡沒有超時限制,消費者掛掉,rabbitmq就重新投遞訊息。訊息處理需要很長時間時也可以很好的支援。
預設情況下訊息確認是開啟的。前面的例子我們通過no_ack=true顯式關閉了。現在我們需要去掉該標誌,任務處理完傳送訊息確認。
def callback(ch, method, properties, body):
print " [x] received %r" % (body,)
time.sleep( body.count('.') )
print " [x] done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='hello')
以上**可以保證當工作程序處理訊息時被殺掉,不會丟訊息,一旦程式掛掉所有未確認的訊息都會被重新投遞。
忘記確認是乙個非常容易犯的常見錯誤,結果很嚴重。rabbitmq會占用越來越多的記憶體因為它不能釋放未確認的訊息。要除錯這種錯誤,可以使用rabbitmqctl列印未確認訊息:
sudo rabbitmqctl list_queue namemessages_ready messages_unacknowledged
訊息持久化
我們知道了如何在消費者掛掉的情況下確保不丟訊息,但是如果rabbitmq服務掛了,訊息還是會丟失,rabbitmq不會記住佇列及訊息,除非我們設定它。要確保不丟訊息,需要把佇列和訊息都設為可持久化durable。
rabbitmq中已經存在乙個非durable的佇列hello,以durable引數再次建立該佇列並不生效。rabbitmq不允許用不同的引數重定義乙個已經存在的佇列,會返回錯誤。
注意,將訊息標記為持久化並不能完全保證訊息不丟失。雖然該標誌通知rabbitmq將訊息儲存到磁碟,但是在rabbitmq收到訊息但還未儲存前還是有乙個小小的時間窗。而且rabbitmq並不對每個訊息都做fsync操作,可能訊息還只是儲存在快取中,並未真正寫入磁碟。但是這種持久化對簡單佇列來講已經足夠了,如果需要強永續性,可以使用publisher confirms。
公平分發
輪詢分發有個問題,假如有兩個消費者,且所有奇數訊息都很繁重,所有偶數訊息都很輕量,那麼乙個消費者就會一直很繁忙而另乙個則幾乎沒什麼工作量。rabbitmq則對此並不知道。
這是因為,在訊息進入佇列時rabbitmq就分發訊息,它並不看消費者的未確認訊息數量,只是盲目的把第n個訊息分配給第n個消費者。
為防止這種情況,我們可以使用basic.qos方法,引數prefetch_count=1。這是通知rabbitmq每次只給消費者乙個訊息。換句話說,在消費者處理完訊息傳送確認之前不再給他分發新訊息,而是把訊息分發給那些不繁忙的消費者。
當然,如果所有的消費者都繁忙,佇列可能會滿,這是應該考慮增加消費者或其它策略。
最終程式
傳送端:
import pika
import sys
connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=true)
message = ' '.join(sys.argv[1:]) or "hello world!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.basicproperties(
delivery_mode = 2, # make message persistent
))print(" [x] sent %r" % message)
connection.close()
接收端:
import pika
import time
connection = pika.blockingconnection(pika.connectionparameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=true)
print(' [*] waiting for messages. to exit press ctrl+c')
def callback(ch, method, properties, body):
print(" [x] received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
RabbitMQ官網教程5 topic
前面的章節我們改進了日誌系統,利用direct交換機,可以選擇性的接收日誌。但是,這仍然有侷限,不能基於多個規則進行路由。在日誌系統裡,我們可能不只根據安全級別訂閱日誌,還可能想根據日誌源來訂閱。就像unix 工具syslog,根據安全級別及裝置來路由日誌。這樣就比較靈活了。為了實現這種效果,我們需...
RabbitMQ官網教程4 路由
前面的章節我們建立了乙個簡單的日誌系統,可以把訊息廣播到許多接收者。本節我們將增加乙個特性 只訂閱一部分訊息。比如只把錯誤日誌輸出到檔案,同時把所有日誌輸出到螢幕。繫結 前面的例子裡我們已經建立過繫結。繫結就是交換機和佇列間的一種關係,簡單解讀為佇列關注該交換機的訊息。建立繫結時可以增加乙個引數ro...
官網指令碼快速安裝rabbitmq
最近有安裝一次rabbitmq公升級到3.8.5版本,這個安裝真的是省事簡單 直接用官方指令碼執行即可 啟動管理平台 sudo rabbitmq plugins enable rabbitmq management 啟停命令 sudo rabbitmq server start sudo rabbi...