搭建**框架
建立 test 目錄,編寫生產者和消費者
跑指令碼:
├─phpamqplib
│ ├─channel
│ ├─connection
│ ├─exception
│ ├─exchange
│ ├─helper
│ │ └─protocol
│ ├─message
│ └─wire
│ └─io
├─test
│ ├─retryp.php
│ └─retryc.php
└─index.php
<?php
function
my_autoloader
($cname
)spl_autoload_register
("my_autoloader");
if(isset
($ar**[2
]))$c
=new
$cname()
;if(!
method_exists($c
,$ar**[2
]))if
(isset
($ar**[3
]))else
}elseif(
isset
($ar**[1
]))$ar**[1
]();
}else
<?php
namespace
test
;use
phpamqplib\connection\amqpstreamconnection
;use
phpamqplib\message\amqpmessage
;use
phpamqplib\wire\amqptable
;class
retryp
}
<?php
namespace
test
;use
phpamqplib\connection\amqpstreamconnection
;use
phpamqplib\message\amqpmessage
;use
phpamqplib\wire\amqptable
;class
retryc
// 正常佇列
public
functionc(
)// 消費訊息,進行業務處理
$msg
->
delivery_info
['channel']-
>
basic_ack
($msg
->
delivery_info
['delivery_tag'])
;// 業務處理失敗,向 retryqueue 發訊息
$msgnew
=new
amqpmessage
(json_encode([
"data"
=>
$data
["data"],
"retrytime"
=>
$data
["retrytime"]+
1]))
;$msg
->
delivery_info
['channel']-
>
basic_publish
($msgnew
,'retry.retryexchange'
,'retry');
};$this
->
channel
->
basic_qos
(null,1
,null);
$this
->
channel
->
basic_consume
('retry.normalqueue',''
,false
,false
,false
,false
,$callback);
// 需要手動確認
while
(count
($this
->
channel
->
callbacks))
$this
->
channel
->
close()
;$this
->
connection
->
close()
;}}
封裝 axios 實現自動重試
為什麼寫這個題目呢?因為之前寫的乙個 node 程式有點小問題,使用的 axios 通過 請求資料,伺服器 阿布雲 時不時抽風 407 413 503 因為第一次寫的時候當做乙個 demo 去實現的,寫的挺簡陋,只能說大體功能對,但是沒有容錯機制。這裡我們先算一筆帳啊,一次請求等於 1 100 次,...
RabbitMQ實現延時佇列
rabbitmq實現延時佇列一般有兩種形式 第一種方式 利用兩個特性 time to live ttl dead letter exchanges dlx a訊息佇列過期 傳送給b佇列 第二種方式 利用rabbitmq的外掛程式x delay message rabbitmq可以針對佇列設定x ex...
Redis Zset實現延時佇列
前言 本篇博文意在使用redis模擬實現延時佇列.redis中的有序集合zset可以實現延時佇列,zset可以看作是縮小版的redis,可以看作是用來儲存鍵值對的集合,是集合名 k v的結構,在zset中,會按照score進行排序。有序集合中鍵值對的鍵被稱為成員,值被稱為分值,分值必須為浮點數。命令...