pika 與 rabbitMQ 阻塞連線

2022-06-21 16:24:11 字數 3175 閱讀 3170

之前只是用celery, 這次用一下pika

參考rabbitmq官網的python版,

沒想到各種坑.

如果說rabbitmq官網是為了讓新人入門,所以刻意忽略掉細節, 那麼必須吐槽pika的官方文件, 很不好.遠不如celery

使用pika 的blockingconnection

但啟動後不久, 作為publish的生產端就會掉線:

raise self._closed_result.value.error

pika.exceptions.streamlosterror: stream connection lost: brokenpipeerror(32, 'broken pipe')

根據

是要在連線時設定心跳為0,就不會超時自動下線了, 否則rabbitmq伺服器會發過來預設值580

#

--------------rabbitmq------------------

import

pika

connection =pika.blockingconnection(

pika.connectionparameters(

host='

localhost',

heartbeat=0, #

never exit after start

))channel =connection.channel()

channel.queue_declare(queue='

update_sql

')

這個錯誤在測試消費端時沒測出來,因為測試使用的發布者和官方文件裡一樣,發完就下線退出了. 這樣當然看不出這個心跳問題.

但是聯調時就暴露了. 真無語.

預設的body是二進位制的. 然後消費端要

body.decode('utf-8')

結果忽然發現 官方**示例裡這麼寫

channel.basic_publish('

exchange_name',

'routing_key',

'test message',

pika.basicproperties(content_type='text/plain'

, type='

example

'))

這似乎時可以發文字的嗎?

然後,看見別人還可以這麼寫

似乎就是html請求頭常見的寫法了? 但是pika裡沒有對basicproperties的詳細文件,

,原始碼裡也看不出注釋

ack防止消費者出問題, durable防止rabbitmq伺服器本身出問題

所以ack在消費端定義

channel.basic_consume(queue='

update_sql',

auto_ack

=false,

on_message_callback=callback)

而durable在channel裡佇列宣告裡 在 生產端,消費端都要統一宣告佇列

channel.queue_declare(queue='

update_sql

', durable=true, exclusive=false, auto_delete=false)

引用 

ack

rabbitmq是預設開啟自動應答的,這樣當rabbitmq將訊息發給消費者,就會從記憶體中將訊息刪除,這樣會帶來乙個問題,如果消費者未處理完訊息而宕機,那麼訊息就會丟失。所以,我們將自動應答關閉,當rabbitmq收到消費者處理完訊息的回應後才會從記憶體中刪除訊息。

durable

rabbitmq預設將訊息儲存在記憶體中,若rabbitmq宕機,那麼所有資料就會丟失,所以在宣告佇列的時候可以宣告將資料持久化,但是如果已經宣告了乙個未持久化的佇列,那麼不能修改,只能將這個佇列刪除或重新宣告乙個持久化資料。

只在消費者這裡加上basic_qos就可以了

connection =pika.blockingconnection(

pika.connectionparameters(

host=self.host_rabbitmq,

heartbeat = 0, #

never exit after start

)) channel =connection.channel()

#durable 佇列中訊息持久化

#exclusive (bool) – don』t allow other consumers on the queue

#./ exchange 不支援 exclusive

channel.queue_declare(queue='

update_sql

', durable=true, exclusive=false, auto_delete=false)

#1次1條訊息channel.basic_qos(prefetch_count=1)channel.basic_consume(queue='

update_sql

',auto_ack=false, #

不自動確認 在callback最後確認 等於 no_ack

on_message_callback=self.callback)

print('

[*] wg-executor waiting for sql cmds. to exit press ctrl+c')

channel.start_consuming()

此外,在消費者的callback函式裡,   

最好在最外層用 異常處理包裹起來,確保無論執行結果如何,都在finally裡執行ack

try:

except:

else:

finally:

#不論當前訊息是否成功,都表示訊息確實處理完了 手動確認 否則沒有ack不再傳送新訊息 保證確實被處理了再確認

ch.basic_ack(delivery_tag = method.delivery_tag)

阻塞與非阻塞賦值

李秋鳳,華清遠見嵌入式學院 講師。稍微接觸過verilog hdl的都對阻塞與非阻塞賦值略知一二,也是我們經常強調的重點之一,在課堂上還是有學員問什麼不一樣呢,為什麼我用阻塞賦值也能得出正確的結果呢?在編寫可綜合 的時候,建議大家不要忘了開啟rtl網表檢視器看看我們自己綜合出來的電路是不是自己想要的...

socket connect 阻塞與非阻塞

socket函式生成socket結構體時,預設生成的socket是阻塞的 如果我們使用connect去連線伺服器,而這時網路出現故障,則connect預設等候很長一段時間然後返回錯誤 我們可以設定socket為非阻塞模式,可以設定一定的等候時間,如果在設定的等候時間內connect失敗,則我們判定網...

阻塞與非阻塞I O

還記得上篇 我們講到的是linux中併發控制訪問的手段有哪些?原子 訊號量 自旋鎖 互斥體。這是為了保護臨界區的資源,是多個程序對共享資源的併發訪問的一種處理手段。但是,在驅動程式中,我們常常為了支援使用者空間對裝置的靈活訪問,引入了阻塞與非阻塞i o兩種不同模式。阻塞操作是指在執行裝置操作時若不能...