問題描述
在消費rabbitmq佇列時, 每次進入**函式內需要進行一些比較耗時的操作;操作完成後給rabbitmq server傳送ack訊號以dequeue本條訊息。
問題就發生在傳送ack操作時, 程式提示鏈結已被斷開或socket error。
原始碼示例
#!/usr/bin
#coding: utf-8
import pika
import time
user = 'guest'
pwd = 'guest'
test_queue = 'just4test'
def callback(ch, method, properties, body):
print(body)
time.sleep(600)
ch.basic_publish('', routing_key=test_queue, body="fortest")
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.blockingconnection(
pika.connectionparameters('127.0.0.1',
credentials=pika.plaincredentials(user, pwd)))
chan = s_conn.channel()
chan.queue_declare(queue=test_queue)
chan.basic_publish('', routing_key=test_queue, body="fortest")
chan.basic_consume(callback, queue=test_queue)
chan.start_consuming()
if __name__ == "__main__":
test_main()
執行一段時間後, 就會報錯:
[error][pika.adapters.base_connection][2017-08-18 12:33:49]error event 25, none
[critical][pika.adapters.base_connection][2017-08-18 12:33:49]tried to handle an error where no error existed
[error][pika.adapters.base_connection][2017-08-18 12:33:49]fatal socket error: brokenpipeerror(32, 'broken pipe')
問題排查
猜測:pika客戶端沒有及時傳送心跳,連線被server斷開
一開始修改了heartbeat_interval引數值, 示例如下:
def test_main():
s_conn = pika.blockingconnection(
pika.connectionparameters('127.0.0.1',
hearrzsgetbeat_inwww.cppcns.comterval=10,
socket_timeout=5,
credentials=pika.plaincredentials(user, pwd)))
# ....
修改後執行依程式設計客棧然報錯,後來想想應該單執行緒被一直占用,pika無法傳送心跳;
於是又加了個心跳執行緒, 示例如下:
#!/usr/bin
#coding: utf-8
import pika
import time
import logging
import threading
user = 'guest'
pwd = 'guest'
test_queue = 'just4test'
class heartbeat(threading.thread):
def __init__(self, connection):
super(heartbeat, self).__init__()
self.lock = threading.lock()
self.connection = connection
self.quitflag = false
self.stopflag = true
self.setdaemon(true)
def run(self):
while not self.quitflag:
time.sleep(10)
self.lock.acquire()
if self.stopflag :
self.lock.release()
continue
try:
self.connection.process_data_events()
except exception as ex:
logging.warn("error format: %s"%(str(ex)))
self.lock.release()
return
self.lock.release()
def startheartbeat(self):
self.lock.acquire()
if self.quitflag==true:
self.lock.release()
return
self.stopflag=false
self.lock.release()
def callback(ch, method, properties, body):
logging.info("recv_body:%s" % body)
time.sleep(600)
ch.basic_ack(delivery_tag = method.delivery_tag)
def test_main():
s_conn = pika.blockingconnection(
pika.connectionparameters('127.0.0.1',
heartbeat_interval=10,
socket_timeout=5,
credentials=pika.plaincredentials(user, pwd)))
chan = s_conn.channel()
chan.queue_declare(queue=test_queue)
chan.basic_consume(callback,
queue=test_queue)
heartbeat = heartbeat(s_conn)
heartbeat.start() #開啟心跳執行緒
heartbeat.程式設計客棧startheartbeat()
chan.start_consuming()
if __name__ == "__main__":
test_main()
嘗試執行,結果還是不行,不得不安靜下來思考自己是不是想錯了。
去看它的api,看到heartbeat_interval的解析:
:param int heartbeat_int程式設計客棧erval: how often to send heartbeats.
min between this value and server's proposal
will be used. use 0 to deactivate heartbeats
and none to accept server's proposal.
按這樣說法,應該還是沒有把心跳值給設定好。上面的程式期望是10秒發一次心跳,但是理論上傳送心跳的間隔會比10秒多一點。所以艾瑪,我應該是把heartbeat_interval的作用搞錯了, 它是指超過這個時間間隔不發心跳或不給server任何資訊,server就會斷開連線, 而不是說pika會按這個間隔來發心跳。 結果我把heartbeat_interval值設定高一點(比實際傳送心跳/資訊的間隔更長),比如上面設定成60秒,就正常執行了。
如果不指定heartbeat_interval, 它預設為none, 意味著按rabbitmq server的配置來檢測心跳是否正常。
如果設定heartbeat_interval=0, 意味著不檢測心跳,server端將不會主動斷開連線。
本文標題: 解決python3 pika之連線斷開的問題
本文位址:
CSS3之hyphens(連字元)
1 連字元 連字元分為硬連字元和軟連字元 硬連字元 hyphen 即使該單詞沒有中間換行,也會顯示連字元 軟連字元 shy 只有單詞中間換行才顯示連字元 2 hyphens屬性 css3提供了hyphens屬性指定文字多行自動換行應斷字。既可以在單詞中使用軟連字元指定換行點,也可以由瀏覽器在適當的位...
python3 mqtt sub永不斷連
mqtt 組成 1 mqtt 伺服器 2 mqtt 訂閱者 sub 3 mqtt 傳送者 pub 搭建mqtt 使用emqx,我的本地機器是centos 8 雲服務是 centos 7.8,搭建大同小異 搭建服務的話,看官網最好 然後就是測試啥的了,用python 測試 訂閱 傳送 先寫訂閱 建立 ...
兄弟連學python(3) 函式文件
函式文件就是用來檢視當前函式相關資訊介紹的乙個特定格式而已。檢視函式文件的方法 help 函式名 此方法會直接輸出函式文件的內容 函式名.doc 直接輸出顯示函式文件的內容元字串 可以使用print 函式名.doc 來解決無格式問題 定義函式文件的方法 def 函式名 引數 這裡編寫函式文件 這裡編...