使用Beanstalk搭建佇列服務

2021-10-03 08:11:11 字數 4316 閱讀 7099

乙個高效能、輕量級的分布式記憶體佇列系統。高效能離不開非同步,非同步離不開佇列,而其內部都是producer-consumer模式的原理。

元件說明

管道(tube)

乙個有名稱的任務佇列,用來儲存統一型別的job,是producer和consumer的操作物件

任務(job)

乙個需要非同步處理的任務,需要放在tube中

生產者(producer)

job的生產者,通過put命令來將乙個job放到乙個tube中

消費者(consumer)

job的消費者,通過reserve、release、bury、delete命令來獲取job或改變job的狀態

狀態說明

ready

已經準備好的任務,可以給消費者獲取

delayed

延遲執行的任務,設定時候設定了延遲時間

reserved

已被消費者獲取,正在執行的任務,beanstalkd服務負責檢查任務是否 在ttr(time-to-run)內完成

buried

保留的任務,任務不會被執行,也不會消失

delete

訊息被徹底刪除,beanstalkd不再維護這些訊息

wget
安裝

tar xzvf beanstalkd-1.11.tar.gz

cd beanstalkd-1.11

make & make install

beanstalkd -v

啟動服務

beanstalkd -l 0.0.0.0 -p 11300 -b /log/beanstalkd/binlog -f
//php版本要求 7.1+

composer require pda/pheanstalk:~4.0

執行composer後,在專案composer.json配置檔案中將增加pda/pheanstalk依賴包

"require"

:

//建立例項

$client

= pheanstalk:

:create

($host

,$port

,$timeout);

//設定使用的tube,新增任務資料

//$data 任務資料

//$priority 任務優先順序.小優先順序數值的job將會排在大優先順序 數值的job前面執行。

//最高優先順序是0,最低優先順序是4,294,967,295

//$delay 任務延遲執行秒數

//$ttr 允許乙個消費者執行該job的秒數

$client

->

usetube

($tube)-

>

put(

$data

,$priority

,$delay

,$ttr

);

ini_set

('default_socket_timeout',24

*60*60

);$client

= pheanstalk:

:create

($host

,$port

,$timeout);

$client

->

watchonly

($tube);

while

(true

)//設定重新計算ttr

$client

->

touch

($job);

//獲取任務資料

$data

=$job

->

getdata()

;//開始執行任務

//任務執行邏輯

$res

=true

//結束任務執行

//刪除任務

$client

->

delete

($job);

if($res

===true

)else

}

default_socket_timeout這個引數是一定要加的,php 預設一般是 60s,假如您沒有在**裡面設定,採用預設的話(60s),60s 之內如果沒有 job 產生,指令碼就會報 socket 錯誤。

以下基於pda/pheanstalk依賴包實現的beanstalk操作類,供參考。。

<?php 

namespace

;use

pheanstalk\pheanstalk

;/**

* beanstalk工具類

* @since 2020-02-26

*/class

beanstalk

/** * 新增任務

* @param string $tube 佇列管道

* @param array $parameters

* @param int $priority 優先順序

* @param int $delay 延遲執行時間

* @param int $ttr 任務超時時間

* @return bool|mixed

*/public

function

addtask

($tube

,array

$parameters

,$priority

= pheanstalk:

:default_priority

,$delay

= pheanstalk:

:default_delay

,$ttr

= pheanstalk:

:default_ttr

)/**

* 建立客戶端連線

* @return mixed

*/public

function

createclient()

return

$this

->

clients

[$this

->

connection];

}/**

* 建立後台工作程序

* @param $tube

* @param array $service

* @return mixed

* @author huangweizhang

* @throws \exception

*/public

function

createworker

($tube

,array

$service

)list

($classname

,$method)=

$service;if

(!class_exists

($classname))

if(!method_exists

($classname

,$method))

//獲取任務

$job

=$client

->

reserve()

;if(is_null

($job))

$client

->

touch

($job);

//獲取任務引數

$stream

=$job

->

getdata()

;$parameters

=unserialize

($stream);

trycatch

(\exception$e)

}}/** * 設定連線

* @param string $connection

*/public

function

setconnection

($connection

)/**

* 設定客戶端連線超時時間

* @param int $clienttimeout

*/public

function

setclienttimeout

($clienttimeout

)}

activeMQ搭建佇列模式

生產者 public class produce connection.close system.out.println 訊息傳送完畢 消費者 public class consumercatch exception e 當然開發中不會有這麼複雜的情況,自己用一些比較通俗的語言來描述一下,生產者就是...

redis 訊息佇列 環境搭建

在我們的生產環境中有兩台伺服器 1.我們在主伺服器上安裝了redis 和phpredis,同樣我們在從伺服器上也進行同樣的安裝。2.我們把主伺服器的產生的log都放到redis的lpush放到list資料型別中 redis lpush sb log list log message 3.然後在從伺服...

ubuntu搭建rabbitmq訊息佇列

rabbitmq 視覺化介面安裝 1.安裝rabbitmq sudo apt install rabbitmq server 2.檢視安裝的版本 sudo rabbitmqctl status grep rabbit 3.安裝管理外掛程式 進入到rabbitmq的安裝目錄 usr lib rabbi...