beanstalk,乙個高效能、輕量級的分布式記憶體佇列系統,最初設計的目的是想通過後台非同步執行耗時的任務來降低高容量web應用系統的頁面訪問延遲,支援過有9.5 million使用者的facebook causes應用。
參考:# yum install beanstalkd
檢視版本
# beanstalkd -v
啟動beanstalkd
# beanstalkd -l 127.0.0.1 -p 11300 -b /usr/local/beanstalkd/binlog &
-b表示開啟binlog,斷電後重啟自動恢復任務 。 /usr/local/beanstalkd/binlog為自建的目錄。
關於系統自動啟動beanstalkd的方法,可以參考如下,
linux配置開機啟動程序的方法這樣就不用每次都重新啟動beanstalkd了。
一、beanstalkd是什麼?
beanstalkd是乙個高效能,輕量級的分布式記憶體佇列
二、beanstalkd特性
1、支援優先順序(支援任務插隊)
2、延遲(實現定時任務)
3、持久化(定時把記憶體中的資料刷到binlog日誌)
4、預留(把任務設定成預留,消費者無法取出任務,等某個合適時機再拿出來處理)
5、任務超時重發(消費者必須在指定時間內處理任務,如果沒有則認為任務失敗,重新進入佇列)
三、beanstalkd核心元素
生產者 -> 管道(tube) -> 任務(job) -> 消費者
beanstalkd可以建立多個管道,管道裡面存了很多任務,消費者從管道中取出任務進行處理。
job:乙個需要非同步處理的任務,需要放在乙個tube中。
tube:乙個有名字的任務佇列,用來儲存統一型別的job,可以建立多個管道
producer:job的生產者
consumer:job的消費者
簡單流程:由 producer 產生乙個任務 job ,並將 job 推進到乙個 tube 中,然後由 consumer 從 tube 中取出 job 執行
四、任務job狀態
delayed 延遲狀態,延遲執行的任務,任務到期後會自動成為當前任務
ready 準備好狀態,可以被消費
reserved 消費者把任務讀出來,正在執行的任務
buried 預留狀態,保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回佇列;將任務取出之後,發現後面執行的邏輯不成熟(比如發郵件,突然發現郵件伺服器掛掉了), //或者說還不能執行後面的邏輯,需要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費
delete 刪除狀態,訊息被徹底刪除。beanstalkd 不再維持這些訊息。
七、pheanstalk使用方法
維護方法
stats() 檢視狀態方法
listtubes() 目前存在的管道
listtubeswatched() 目前監聽的管道
statstube() 管道的狀態
usetube() 指定使用的管道
statsjob() 檢視任務的詳細資訊
peek() 通過任務id獲取任務
生產者方法
putintube() 往管道中寫入資料
put() 配合usetube()使用
消費者方法
watch() 監聽管道,可以同時監聽多個管道
ignore() 不監聽管道
reserve() 以阻塞方式監聽管道,獲取任務
reservefromtube()
release() 把任務重新放回管道
bury() 把任務預留
peekburied() 把預留任務讀取出來
kickjob() 把buried狀態的任務設定成ready
kick() 批量把buried狀態的任務設定成ready
peekready() 把準備好的任務讀取出來
peekdelayed() 把延遲的任務讀取出來
pausetube() 給管道設定延遲
resumetube() 取消管道延遲
touch() 讓任務重新計算ttr時間,給任務續命
beanstalk主要包括4個部分。
1> job:乙個需要非同步處理的任務,需要放在乙個tube中。
2> tube:乙個有名的任務佇列,用來儲存統一型別的job,是producer和consumer操作的物件。
3> producer:job的生產者,通過put命令來將乙個job放到乙個tube中。
4> consumer:job的消費者,通過reserve、release、bury、delete命令來獲取job或改變job的狀態。
我們可以準備composer環境來執行
composer技術可參見:
var_dump('管道狀態=',$p->stats());//檢視目前pheanstalkd狀態資訊
var_dump('存在的管道=',$p->listtubes());//顯示目前存在的管道
consumer.php
<?php
require "vendor/autoload.php";
try
}}catch (exception $ex)
1.啟動beanstakld# beanstalkd -l 127.0.0.1 -p 11300 -b /usr/local/beanstalkd/binlog &
2.執行生產任務
3.執行消費任務
beanstalkd+redis實現方法
可以用redis快取任務狀態的方式實現。
#php consumer_redis.php
require "vendor/autoload.php";
//演示多個消費者可能同時獲取任務的流程,需要保證任務的同步處理(通過redis儲存任務狀態資訊)。
//1.任務可靠性問題,假設乙個消費者獲取到任務,執行任務時,突然宕機了,導致任務不能正常執行,
//比如訂單付款後增加積分或餘額的任務,沒有被正常執行,那麼資料會不一致,問題就很嚴重,
//2.消費冪等性問題,也就是消費者的任務重複執行問題,如果乙個任務被多個消費者重複執行,也會發生很嚴重問題,
//比如,訂單付款後庫存減少,如果被多次執行,會出現庫存不對的情況,
//訊息的雙向確認,去重/重傳
//為保證上述情況能正確處理,避免上述發生情況,我們可以將任務設定狀態,1任務正在執行中,2任務執行完成
$redis=new predis\client(['scheme'=>'tcp','host'=>"127.0.0.1",'port'=>6379]);
try elseif($state==2)else}}
}catch (exception $ex)
beanstalkd+swoole實現方法為提高任務非同步處理效能,我們可以使用swoole來處理。
#php consumer_swoole.php
<?php
require "vendor/autoload.php";
$workernum = 4;
$pool = new swoole\process\pool($workernum);
$pool->on("workerstart", function ($pool, $workerid)
}} catch (exception $ex)
});$pool->on("workerstop", function ($pool, $workerid) is stopped\n";
});$pool->start();
訊息中介軟體
1.訊息的優先順序 2.訊息排序 3.訊息過濾 4.訊息持久化 5.訊息重試 6.事務的支援 7.broker滿 生產者,佇列,消費者 訊息佇列的優點 1 解耦2 非同步訊息,系統響應 在jms中,有兩種訊息模型 點對點模式和發布訂閱模式。1.在點對點模式中 有三種角色 1 訊息佇列,傳送者,接受者...
訊息中介軟體
如何理解訊息中介軟體?訊息中介軟體是儲存訊息的乙個容器,與資料庫不同的是資料庫儲存的資料是可以被修改的,而訊息中介軟體一般不會被修改 訊息中介軟體在消費的生產者與消費者產生,相當於乙個中間人的角色,提供了路由保證訊息的傳遞,如果消費者不能及時接收,訊息會保留下來,知道消費者上線 保證在存活期內 訊息...
訊息中介軟體
訊息中介軟體是在訊息的傳輸過程中儲存訊息 訊息傳遞過程中不能更改 的容器。訊息中介軟體再將訊息從它的原中繼到它的目標時充當中間人的作用。訊息中介軟體的主要目的是提供路由並保證訊息的傳遞 如果傳送訊息時接收者不可用,訊息佇列會保留訊息,知道可以成功傳遞為止,當然,訊息佇列儲存訊息也是有期限的。訊息傳送...