beanstalkd訊息佇列

2021-07-12 02:10:36 字數 3174 閱讀 3320

簡介

beanstalkd,乙個高效能、輕量級的分布式記憶體佇列系統,最初設計的目的是想通過後台非同步執行耗時的任務來降低高容量web應用系統的頁面訪問延遲,支援過有9.5 million使用者的facebook causes應用。後來開源,現在有postrank大規模部署和使用,每天處理百萬級任務。beanstalkd是典型的類memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的使用者會覺得beanstalkd似曾相識。

beanstalkd安裝啟動

安裝命令:yum install beanstalkd

後台啟動:beanstalkd -l 192.168.2.231 -p 11300 &

(beanstalkd -l 位址 -p 埠號 -z 最大的任務大小(byte) -c &)

啟動選項

-b dir wal directory

-f ms fsync at most once every ms milliseconds (use -f0 for 「always fsync」)

-f never fsync (default)

-l addr listen on address (default is 0.0.0.0)

-p port listen on port (default is 11300)

-u user become user and group

-z bytes set the maximum job size in bytes (default is 65535)

-s bytes set the size of each wal file (default is 10485760)

(will be rounded up to a multiple of 512 bytes)

-c compact the binlog (default)

-n do not compact the binlog

-v show version information

-v increase verbosity

-h show this help

專案配置

檔案shop/hnguanli/config/params.default.php重新命名為shop/hnguanli/config/params.php

1、增加傳送佇列(管道)

return

array(

'subscribers' => array( // 訊息訂閱者清單

'goods' => array( // 商品更新 -- 管道組

'solr', // 通知物件 -- 管道)),

);

按照實體分組,將管道組置於subscribers陣列元素裡面,要通知的物件作為乙個管道新增到對應的管道組下面,如goods為乙個管道組別,通知物件為solr。

2、配置beanstalkd

'beanstalkd' => array(     // beanstalkd服務配置

'persistent' => true, // 是否保持長連線

'host' => '192.168.2.231', // ip位址

'port' => 11300, // 埠號

'timeout' => 3, // 連線超時時間

),

注意ip位址和埠號要與beanstalkd程序相對應

傳送訊息佇列

業務完成後通知對應管道組中物件更新,可使用lib_message_queue.php的product方法

require_once

'/data/shop/hnguanli/includes/lib_message_queue.php';

$mq = new messagequeue();

$mq->product('goods', $goods_id); //goods為通知的管道組,$goods_id可為int或array

監聽訊息佇列指令碼

訊息已發出了,當然也需要乙個來消費訊息並執行相應業務的指令碼,目前放置監聽指令碼的目錄為/data/shop/hnguanli/scripts/

具體例子可參照/data/shop/hnguanli/scripts/goods_watch.php

先了解一下messagequeue::watch()的定義:

該方法第一次會建立乙個到訊息伺服器的長連線,並監聽訂閱的佇列,只要佇列中有訊息(無論是新的還是未處理成功的),就會阻塞當前獲取的訊息,呼叫訊息處理函式,待處理完畢後繼續監聽佇列。

注意:訊息處理函式處理完更新業務後,一定要顯式返回true,否則下次接收到的訊息永遠是上一條訊息。

//solr--管道, goods--管道組

$queue = isset($_server["argv"][1]) ? $_server["argv"][1] : 'goods';

$mq = new messagequeue('solr');

try );

} catch (exception

$e)

另外變數$queue為指令碼命令中傳送的第乙個引數,如果通知物件不同但使用同乙個監聽指令碼時,需要在執行指令碼命令中帶上這個引數。

傳送和接收訊息都已經實現了,接收就需要啟動對應的監聽指令碼。下列以後臺執行的方式啟動。

nohup /usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &

或者 (/usr/local/php/bin/php /data/shop/hnguanli/scripts/goods_watch.php &)

準備工作都做好了,接下來就可以進行測試,以下的log檔案會有助於你分析和定位問題。

生產及消費訊息log:/data/shop/hnguanli/hnlog_/mqlog/管道組名.管道名.log

指令碼執行錯誤log:/data/shop/hnguanli/hnlog_/mqlog/指令碼名.管道名.log

Beanstalkd訊息佇列介紹

beanstalkd是乙個高效能,輕量級的分布式記憶體佇列 1 支援優先順序 支援任務插隊 2 延遲 實現定時任務 3 持久化 定時把記憶體中的資料刷到binlog日誌 4 預留 把任務設定成預留,消費者無法取出任務,等某個合適時機再拿出來處理 5 任務超時重發 消費者必須在指定時間內處理任務,如果...

beanstalkd 訊息佇列發郵件

放入訊息 獲取beanstalk例項 staticvar resource bool beanstalk return resource function get beanstalk libraries beanstalkd pheanstalk init.php 載入配置檔案 ci get ins...

beanstalkd 訊息佇列發郵件

放入訊息 獲取beanstalk例項 staticvar resource bool beanstalk return resource function get beanstalk libraries beanstalkd pheanstalk init.php 載入配置檔案 ci get ins...