celery分布式使用

2022-07-15 09:21:12 字數 1612 閱讀 9698

celery包含如下元件:

1. celery beat:任務排程器,beat程序會讀取配置檔案的內容,周期性地將配置中到期需要執行的任務傳送給任務佇列(一般用於定時任務使用)。

2. celery worker:執行任務的消費者,通常會在多台伺服器執行多個消費者來提高執行效率。

3. broker:訊息**,或者叫作訊息中介軟體,接受任務生產者傳送過來的任務訊息,存進佇列再按序分發給任務消費方(本方案使用redis)。

4. producer:呼叫了celery提供的api、函式或者裝飾器而產生任務並交給任務佇列處理的都是任務生產者。

5. result backend:任務處理完後儲存狀態資訊和結果,以供查詢。(本方案使用redis來儲存結果)

celery_queues = ( # 定義任務佇列

queue("tasks_a", routing_key="a.#"), # 路由鍵以「a.」開頭的訊息都進tasks_a佇列

)celery_routes = (

[("distributed.taska", ), # 將taska任務分配至佇列 tasks_a

],)celery_result_serializer = "json" # 讀取任務結果一般效能要求不高,所以使用了可讀性更好的json

celery_task_result_expires = 60 * 60 * 24 # 任務過期時間

celery_result_backend = 'redis:'

broker_url = 'redis:'

celery -a distributed.celery worker -q tasks_a --concurrency=2 -l info

-q後面為方法對應的佇列名稱

--concurrency= 後面 為開啟的worker數目,也可以使用 -c=

具體開啟是數目根據你的電腦cpu個數確定,小於等於cpu個數即可

備註
python(3.8.2)之後celery4.1.1版本使用不了,目前使用5.0.2版本,然後又遇見以下錯誤:

啟動celery時,報 from . import async, base syntaxerror: invalid syntax

查了資料後了解到是因為celery依賴中的async模組和python的關鍵字async存在衝突, 所以在匯入時報錯。

使用 pip install --upgrade 來更新一下就行了。

Celery分布式應用

最近有應用需要部署到不同的伺服器上執行,但是有沒有pbs這樣的排程系統,就想起來python的排程神器 celery。現在針對我的實際應用做一些記錄。因為我並不注重結果而是把命令拿到不同的機器上執行,所以訊息 和結果儲存我都選擇了 redis 只需要在一台伺服器上安裝 redis,其他伺服器均使用該...

celery實現分布式

worker執行在不同的機器上,每台機器上有多個task程序 worker執行的時候連線到broker,在控制機器上 任務發布節點 直接向broker傳送任務,只要建立乙個broker,在不同的機器上執行celery worker。框架圖 task.py coding utf 8 from cele...

分布式佇列 Celery

詳情參見 分布式佇列神器 celery 2 celery 4.1 使用者指南 task 任務 3 celery 4.1 使用者指南 calling tasks 呼叫任務 4 celery 4.1 使用者指南 canvas designing work flows 設計工作流程 5 celery 4....