解決python3 pika之連線斷開的問題

2022-10-03 12:51:12 字數 4222 閱讀 5994

問題描述

在消費rabbitmq佇列時, 每次進入**函式內需要進行一些比較耗時的操作;操作完成後給rabbitmq server傳送ack訊號以dequeue本條訊息。

問題就發生在傳送ack操作時, 程式提示鏈結已被斷開或socket error。

原始碼示例

#!/usr/bin

#coding: utf-8

import pika

import time

user = 'guest'

pwd = 'guest'

test_queue = 'just4test'

def callback(ch, method, properties, body):

print(body)

time.sleep(600)

ch.basic_publish('', routing_key=test_queue, body="fortest")

ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():

s_conn = pika.blockingconnection(

pika.connectionparameters('127.0.0.1',

credentials=pika.plaincredentials(user, pwd)))

chan = s_conn.channel()

chan.queue_declare(queue=test_queue)

chan.basic_publish('', routing_key=test_queue, body="fortest")

chan.basic_consume(callback, queue=test_queue)

chan.start_consuming()

if __name__ == "__main__":

test_main()

執行一段時間後, 就會報錯:

[error][pika.adapters.base_connection][2017-08-18 12:33:49]error event 25, none

[critical][pika.adapters.base_connection][2017-08-18 12:33:49]tried to handle an error where no error existed

[error][pika.adapters.base_connection][2017-08-18 12:33:49]fatal socket error: brokenpipeerror(32, 'broken pipe')

問題排查

猜測:pika客戶端沒有及時傳送心跳,連線被server斷開

一開始修改了heartbeat_interval引數值, 示例如下:

def test_main():

s_conn = pika.blockingconnection(

pika.connectionparameters('127.0.0.1',

hearrzsgetbeat_inwww.cppcns.comterval=10,

socket_timeout=5,

credentials=pika.plaincredentials(user, pwd)))

# ....

修改後執行依程式設計客棧然報錯,後來想想應該單執行緒被一直占用,pika無法傳送心跳;

於是又加了個心跳執行緒, 示例如下:

#!/usr/bin

#coding: utf-8

import pika

import time

import logging

import threading

user = 'guest'

pwd = 'guest'

test_queue = 'just4test'

class heartbeat(threading.thread):

def __init__(self, connection):

super(heartbeat, self).__init__()

self.lock = threading.lock()

self.connection = connection

self.quitflag = false

self.stopflag = true

self.setdaemon(true)

def run(self):

while not self.quitflag:

time.sleep(10)

self.lock.acquire()

if self.stopflag :

self.lock.release()

continue

try:

self.connection.process_data_events()

except exception as ex:

logging.warn("error format: %s"%(str(ex)))

self.lock.release()

return

self.lock.release()

def startheartbeat(self):

self.lock.acquire()

if self.quitflag==true:

self.lock.release()

return

self.stopflag=false

self.lock.release()

def callback(ch, method, properties, body):

logging.info("recv_body:%s" % body)

time.sleep(600)

ch.basic_ack(delivery_tag = method.delivery_tag)

def test_main():

s_conn = pika.blockingconnection(

pika.connectionparameters('127.0.0.1',

heartbeat_interval=10,

socket_timeout=5,

credentials=pika.plaincredentials(user, pwd)))

chan = s_conn.channel()

chan.queue_declare(queue=test_queue)

chan.basic_consume(callback,

queue=test_queue)

heartbeat = heartbeat(s_conn)

heartbeat.start() #開啟心跳執行緒

heartbeat.程式設計客棧startheartbeat()

chan.start_consuming()

if __name__ == "__main__":

test_main()

嘗試執行,結果還是不行,不得不安靜下來思考自己是不是想錯了。

去看它的api,看到heartbeat_interval的解析:

:param int heartbeat_int程式設計客棧erval: how often to send heartbeats.

min between this value and server's proposal

will be used. use 0 to deactivate heartbeats

and none to accept server's proposal.

按這樣說法,應該還是沒有把心跳值給設定好。上面的程式期望是10秒發一次心跳,但是理論上傳送心跳的間隔會比10秒多一點。所以艾瑪,我應該是把heartbeat_interval的作用搞錯了, 它是指超過這個時間間隔不發心跳或不給server任何資訊,server就會斷開連線, 而不是說pika會按這個間隔來發心跳。 結果我把heartbeat_interval值設定高一點(比實際傳送心跳/資訊的間隔更長),比如上面設定成60秒,就正常執行了。

如果不指定heartbeat_interval, 它預設為none, 意味著按rabbitmq server的配置來檢測心跳是否正常。

如果設定heartbeat_interval=0, 意味著不檢測心跳,server端將不會主動斷開連線。

本文標題: 解決python3 pika之連線斷開的問題

本文位址:

CSS3之hyphens(連字元)

1 連字元 連字元分為硬連字元和軟連字元 硬連字元 hyphen 即使該單詞沒有中間換行,也會顯示連字元 軟連字元 shy 只有單詞中間換行才顯示連字元 2 hyphens屬性 css3提供了hyphens屬性指定文字多行自動換行應斷字。既可以在單詞中使用軟連字元指定換行點,也可以由瀏覽器在適當的位...

python3 mqtt sub永不斷連

mqtt 組成 1 mqtt 伺服器 2 mqtt 訂閱者 sub 3 mqtt 傳送者 pub 搭建mqtt 使用emqx,我的本地機器是centos 8 雲服務是 centos 7.8,搭建大同小異 搭建服務的話,看官網最好 然後就是測試啥的了,用python 測試 訂閱 傳送 先寫訂閱 建立 ...

兄弟連學python(3) 函式文件

函式文件就是用來檢視當前函式相關資訊介紹的乙個特定格式而已。檢視函式文件的方法 help 函式名 此方法會直接輸出函式文件的內容 函式名.doc 直接輸出顯示函式文件的內容元字串 可以使用print 函式名.doc 來解決無格式問題 定義函式文件的方法 def 函式名 引數 這裡編寫函式文件 這裡編...