乙個高效能、輕量級的分布式記憶體佇列系統。高效能離不開非同步,非同步離不開佇列,而其內部都是producer-consumer模式的原理。
元件說明
管道(tube)
乙個有名稱的任務佇列,用來儲存統一型別的job,是producer和consumer的操作物件
任務(job)
乙個需要非同步處理的任務,需要放在tube中
生產者(producer)
job的生產者,通過put命令來將乙個job放到乙個tube中
消費者(consumer)
job的消費者,通過reserve、release、bury、delete命令來獲取job或改變job的狀態
狀態說明
ready
已經準備好的任務,可以給消費者獲取
delayed
延遲執行的任務,設定時候設定了延遲時間
reserved
已被消費者獲取,正在執行的任務,beanstalkd服務負責檢查任務是否 在ttr(time-to-run)內完成
buried
保留的任務,任務不會被執行,也不會消失
delete
訊息被徹底刪除,beanstalkd不再維護這些訊息
wget
安裝
tar xzvf beanstalkd-1.11.tar.gz
cd beanstalkd-1.11
make & make install
beanstalkd -v
啟動服務
beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -f
//php版本要求 7.1+
composer require pda/pheanstalk:~4.0
執行composer後,在專案composer.json配置檔案中將增加pda/pheanstalk依賴包
"require"
:
//建立例項
$client
= pheanstalk:
:create
($host
,$port
,$timeout);
//設定使用的tube,新增任務資料
//$data 任務資料
//$priority 任務優先順序.小優先順序數值的job將會排在大優先順序 數值的job前面執行。
//最高優先順序是0,最低優先順序是4,294,967,295
//$delay 任務延遲執行秒數
//$ttr 允許乙個消費者執行該job的秒數
$client
->
usetube
($tube)-
>
put(
$data
,$priority
,$delay
,$ttr
);
ini_set
('default_socket_timeout',24
*60*60
);$client
= pheanstalk:
:create
($host
,$port
,$timeout);
$client
->
watchonly
($tube);
while
(true
)//設定重新計算ttr
$client
->
touch
($job);
//獲取任務資料
$data
=$job
->
getdata()
;//開始執行任務
//任務執行邏輯
$res
=true
//結束任務執行
//刪除任務
$client
->
delete
($job);
if($res
===true
)else
}
default_socket_timeout
這個引數是一定要加的,php 預設一般是 60s,假如您沒有在**裡面設定,採用預設的話(60s),60s 之內如果沒有 job 產生,指令碼就會報 socket 錯誤。
以下基於pda/pheanstalk依賴包實現的beanstalk操作類,供參考。。
<?php
namespace
;use
pheanstalk\pheanstalk
;/**
* beanstalk工具類
* @since 2020-02-26
*/class
beanstalk
/** * 新增任務
* @param string $tube 佇列管道
* @param array $parameters
* @param int $priority 優先順序
* @param int $delay 延遲執行時間
* @param int $ttr 任務超時時間
* @return bool|mixed
*/public
function
addtask
($tube
,array
$parameters
,$priority
= pheanstalk:
:default_priority
,$delay
= pheanstalk:
:default_delay
,$ttr
= pheanstalk:
:default_ttr
)/**
* 建立客戶端連線
* @return mixed
*/public
function
createclient()
return
$this
->
clients
[$this
->
connection];
}/**
* 建立後台工作程序
* @param $tube
* @param array $service
* @return mixed
* @author huangweizhang
* @throws \exception
*/public
function
createworker
($tube
,array
$service
)list
($classname
,$method)=
$service;if
(!class_exists
($classname))
if(!method_exists
($classname
,$method))
//獲取任務
$job
=$client
->
reserve()
;if(is_null
($job))
$client
->
touch
($job);
//獲取任務引數
$stream
=$job
->
getdata()
;$parameters
=unserialize
($stream);
trycatch
(\exception$e)
}}/** * 設定連線
* @param string $connection
*/public
function
setconnection
($connection
)/**
* 設定客戶端連線超時時間
* @param int $clienttimeout
*/public
function
setclienttimeout
($clienttimeout
)}
activeMQ搭建佇列模式
生產者 public class produce connection.close system.out.println 訊息傳送完畢 消費者 public class consumercatch exception e 當然開發中不會有這麼複雜的情況,自己用一些比較通俗的語言來描述一下,生產者就是...
redis 訊息佇列 環境搭建
在我們的生產環境中有兩台伺服器 1.我們在主伺服器上安裝了redis 和phpredis,同樣我們在從伺服器上也進行同樣的安裝。2.我們把主伺服器的產生的log都放到redis的lpush放到list資料型別中 redis lpush sb log list log message 3.然後在從伺服...
ubuntu搭建rabbitmq訊息佇列
rabbitmq 視覺化介面安裝 1.安裝rabbitmq sudo apt install rabbitmq server 2.檢視安裝的版本 sudo rabbitmqctl status grep rabbit 3.安裝管理外掛程式 進入到rabbitmq的安裝目錄 usr lib rabbi...