之前只是用celery, 這次用一下pika
參考rabbitmq官網的python版,
沒想到各種坑.
如果說rabbitmq官網是為了讓新人入門,所以刻意忽略掉細節, 那麼必須吐槽pika的官方文件, 很不好.遠不如celery
使用pika 的blockingconnection
但啟動後不久, 作為publish的生產端就會掉線:
raise self._closed_result.value.error根據pika.exceptions.streamlosterror: stream connection lost: brokenpipeerror(32, 'broken pipe')
是要在連線時設定心跳為0,就不會超時自動下線了, 否則rabbitmq伺服器會發過來預設值580
#--------------rabbitmq------------------
import
pika
connection =pika.blockingconnection(
pika.connectionparameters(
host='
localhost',
heartbeat=0, #
never exit after start
))channel =connection.channel()
channel.queue_declare(queue='
update_sql
')
這個錯誤在測試消費端時沒測出來,因為測試使用的發布者和官方文件裡一樣,發完就下線退出了. 這樣當然看不出這個心跳問題.
但是聯調時就暴露了. 真無語.
預設的body是二進位制的. 然後消費端要
body.decode('utf-8')
結果忽然發現 官方**示例裡這麼寫
channel.basic_publish('exchange_name',
'routing_key',
'test message',
pika.basicproperties(content_type='text/plain'
, type='
example
'))
這似乎時可以發文字的嗎?
然後,看見別人還可以這麼寫
似乎就是html請求頭常見的寫法了? 但是pika裡沒有對basicproperties的詳細文件,
,原始碼裡也看不出注釋
ack防止消費者出問題, durable防止rabbitmq伺服器本身出問題
所以ack在消費端定義
channel.basic_consume(queue='update_sql',
auto_ack
=false,
on_message_callback=callback)
而durable在channel裡佇列宣告裡 在 生產端,消費端都要統一宣告佇列
channel.queue_declare(queue='update_sql
', durable=true, exclusive=false, auto_delete=false)
引用
ack只在消費者這裡加上basic_qos就可以了rabbitmq是預設開啟自動應答的,這樣當rabbitmq將訊息發給消費者,就會從記憶體中將訊息刪除,這樣會帶來乙個問題,如果消費者未處理完訊息而宕機,那麼訊息就會丟失。所以,我們將自動應答關閉,當rabbitmq收到消費者處理完訊息的回應後才會從記憶體中刪除訊息。
durable
rabbitmq預設將訊息儲存在記憶體中,若rabbitmq宕機,那麼所有資料就會丟失,所以在宣告佇列的時候可以宣告將資料持久化,但是如果已經宣告了乙個未持久化的佇列,那麼不能修改,只能將這個佇列刪除或重新宣告乙個持久化資料。
connection =pika.blockingconnection(pika.connectionparameters(
host=self.host_rabbitmq,
heartbeat = 0, #
never exit after start
)) channel =connection.channel()
#durable 佇列中訊息持久化
#exclusive (bool) – don』t allow other consumers on the queue
#./ exchange 不支援 exclusive
channel.queue_declare(queue='
update_sql
', durable=true, exclusive=false, auto_delete=false)
#1次1條訊息channel.basic_qos(prefetch_count=1)channel.basic_consume(queue='
update_sql
',auto_ack=false, #
不自動確認 在callback最後確認 等於 no_ack
on_message_callback=self.callback)
print('
[*] wg-executor waiting for sql cmds. to exit press ctrl+c')
channel.start_consuming()
此外,在消費者的callback函式裡,
最好在最外層用 異常處理包裹起來,確保無論執行結果如何,都在finally裡執行ack
try:
except:
else:
finally:
#不論當前訊息是否成功,都表示訊息確實處理完了 手動確認 否則沒有ack不再傳送新訊息 保證確實被處理了再確認
ch.basic_ack(delivery_tag = method.delivery_tag)
阻塞與非阻塞賦值
李秋鳳,華清遠見嵌入式學院 講師。稍微接觸過verilog hdl的都對阻塞與非阻塞賦值略知一二,也是我們經常強調的重點之一,在課堂上還是有學員問什麼不一樣呢,為什麼我用阻塞賦值也能得出正確的結果呢?在編寫可綜合 的時候,建議大家不要忘了開啟rtl網表檢視器看看我們自己綜合出來的電路是不是自己想要的...
socket connect 阻塞與非阻塞
socket函式生成socket結構體時,預設生成的socket是阻塞的 如果我們使用connect去連線伺服器,而這時網路出現故障,則connect預設等候很長一段時間然後返回錯誤 我們可以設定socket為非阻塞模式,可以設定一定的等候時間,如果在設定的等候時間內connect失敗,則我們判定網...
阻塞與非阻塞I O
還記得上篇 我們講到的是linux中併發控制訪問的手段有哪些?原子 訊號量 自旋鎖 互斥體。這是為了保護臨界區的資源,是多個程序對共享資源的併發訪問的一種處理手段。但是,在驅動程式中,我們常常為了支援使用者空間對裝置的靈活訪問,引入了阻塞與非阻塞i o兩種不同模式。阻塞操作是指在執行裝置操作時若不能...