celery包含如下元件:1. celery beat:任務排程器,beat程序會讀取配置檔案的內容,周期性地將配置中到期需要執行的任務傳送給任務佇列(一般用於定時任務使用)。
2. celery worker:執行任務的消費者,通常會在多台伺服器執行多個消費者來提高執行效率。
3. broker:訊息**,或者叫作訊息中介軟體,接受任務生產者傳送過來的任務訊息,存進佇列再按序分發給任務消費方(本方案使用redis)。
4. producer:呼叫了celery提供的api、函式或者裝飾器而產生任務並交給任務佇列處理的都是任務生產者。
5. result backend:任務處理完後儲存狀態資訊和結果,以供查詢。(本方案使用redis來儲存結果)
celery_queues = ( # 定義任務佇列
queue("tasks_a", routing_key="a.#"), # 路由鍵以「a.」開頭的訊息都進tasks_a佇列
)celery_routes = (
[("distributed.taska", ), # 將taska任務分配至佇列 tasks_a
],)celery_result_serializer = "json" # 讀取任務結果一般效能要求不高,所以使用了可讀性更好的json
celery_task_result_expires = 60 * 60 * 24 # 任務過期時間
celery_result_backend = 'redis:'
broker_url = 'redis:'
celery -a distributed.celery worker -q tasks_a --concurrency=2 -l info
-q後面為方法對應的佇列名稱備註--concurrency= 後面 為開啟的worker數目,也可以使用 -c=
具體開啟是數目根據你的電腦cpu個數確定,小於等於cpu個數即可
python(3.8.2)之後celery4.1.1版本使用不了,目前使用5.0.2版本,然後又遇見以下錯誤:啟動celery時,報 from . import async, base syntaxerror: invalid syntax
查了資料後了解到是因為celery依賴中的async模組和python的關鍵字async存在衝突, 所以在匯入時報錯。
使用 pip install --upgrade 來更新一下就行了。
Celery分布式應用
最近有應用需要部署到不同的伺服器上執行,但是有沒有pbs這樣的排程系統,就想起來python的排程神器 celery。現在針對我的實際應用做一些記錄。因為我並不注重結果而是把命令拿到不同的機器上執行,所以訊息 和結果儲存我都選擇了 redis 只需要在一台伺服器上安裝 redis,其他伺服器均使用該...
celery實現分布式
worker執行在不同的機器上,每台機器上有多個task程序 worker執行的時候連線到broker,在控制機器上 任務發布節點 直接向broker傳送任務,只要建立乙個broker,在不同的機器上執行celery worker。框架圖 task.py coding utf 8 from cele...
分布式佇列 Celery
詳情參見 分布式佇列神器 celery 2 celery 4.1 使用者指南 task 任務 3 celery 4.1 使用者指南 calling tasks 呼叫任務 4 celery 4.1 使用者指南 canvas designing work flows 設計工作流程 5 celery 4....