最近使用者反饋提交的sql
查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。
現象:即使sql
控制台提交10個簡單sql
查詢 -> 訊息傳送方:傳送10條訊息至訊息佇列 -> 訊息消費方:只消費了7條訊息
訊息佇列:rabbitmq
:
消費者:python
:
結論:訊息傳送正常
排查步驟:檢視log
結論:訊息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue
外掛程式,用於dump
佇列的訊息;
1. 執行機:停止服務;
2. 使用者:提交10個sql
查詢:
3. 傳送方:檢視web
服務端的輸出日誌,確定10個訊息已經往訊息佇列寫;
4. 執行機:通過rabbitmq-dump-queue
檢視佇列的訊息,確認是正常10個訊息寫入;
watch -n 1
'$gopath/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@***xx:5672" -queue ph_open_task'
5. 執行機:啟動服務,訊息佇列中的訊息全部被接收;**邏輯:
try:
pool = pool(processes=40)
def callback(ch, method, properties, body):
try:
dosomething...
except exception as e:
print traceback.format_exc()
logger_msg.info(traceback.format_exc())
finally:
// 這裡會有問題,即使訊息未被處理也會反饋ack給rabbitmq
ch.basic_ack(delivery_tag=method.delivery_tag)
while
true:
try:
connection = pika.blockingconnection(
pika.connectionparameters(host='******xx'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=true)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_name, no_ack=false)
channel.start_consuming()
except pika.exceptions.connectionclosed as e:
continue
except exception as e:
logger_msg.info(traceback.format_exc())
finally:
channel.basic_ack(delivery_tag=method.delivery_tag)
pool.close
()pool.join
()
結論:本例中消費者主程序將持續監聽mq
,一旦mq
有訊息將會拉取,隨後從程序池中啟動子程序來處理訊息,但是從程序池啟動子程序的過程並不一定成功(若當前程序池沒有空閒子程序),而主程序不管任何情況下都給mq
傳送ack
狀態碼,從而mq
將未處理的訊息移除掉,導致訊息丟失
問題是在消費者環節產生,因此對消費者做改動,需要調整消費者的架構:
目前方案的問題以及解決方案:
RabbitMQ防止訊息丟失
rabbitmq中,訊息丟失可以簡單的分為兩種 客戶端丟失和服務端丟失。針對這兩種訊息丟失,rabbitmq都給出了相應的解決方案。回到目錄 如圖,生產者p向佇列中生產訊息,c1和c2消費佇列中的訊息,預設情況下,rabbitmq會平均的分發消費給c1c2 round robin dispatchi...
RabbitMQ防止訊息丟失
rabbitmq一般情況很少丟失,但是不能排除意外,為了保證系統高可用,我們必須作出更好完善措施,保證系統的穩定性。1.訊息持久化 2.ack確認機制 3.設定集群映象模式 4.訊息補償機制 第一種 訊息持久化 rabbitmq 的訊息預設存放在記憶體上面,如果不特別宣告設定,訊息不會持久化儲存到硬...
rabbitmq 重複ACK導致訊息丟失
背景 rabbitmq 在應用場景中,大多採用工作佇列 work queue的模式。在乙個常見的工作佇列模式中,消費者 worker 將不斷的輪詢從佇列中拉取最新訊息,當佇列負載壓力增大時允許新增多個worker 進行處理。然而執行乙個任務可能需要相當的時長,這是由業務特性所決定的 如果 worke...