celery是乙個簡單、靈活且可靠的,處理大量訊息的分布式系統
專注於實時處理的非同步任務佇列
同時也支援任務排程
celery的架構由三部分組成,訊息中介軟體(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)組成。
訊息中介軟體
celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,rabbitmq, redis等等
任務執行單元
worker是celery提供的任務執行的單元,worker併發的執行在分布式的系統節點中。
任務結果儲存
task result store用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括amqp, redis等
celery version 4.0 runs on定時任務:定時執行某件事情,比如每天資料統計python ❨2.7, 3.4, 3.5❩
pypy ❨5.4, 5.5❩
this is the last version to support python 2.7, and from the next version (celery 5.x) python 3.5 or newer is required.
if you』re running an older version of python, you need to be running an older version of celery:
python 2.6: celery series 3.1 or earlier.
python 2.5: celery series 3.0 or earlier.
python 2.4 was celery series 2.2 or earlier.
celery is a project with minimal funding, so we don』t support microsoft windows. please don』t open any issues related to that platform.
pip install celery
訊息中介軟體:rabbitmq/redis
基本使用
建立專案celerytest
import celery建立py檔案:add_task.py,新增任務import time
# broker='redis:' 不加密碼
backend='redis:'
broker='redis:'
cel=celery.celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
return x+y
建立py檔案:result.py,檢視任務執行結果
執行 add_task.py,新增任務,並獲取任務id
執行 result.py,檢查任務狀態並獲取結果
多工結構
pro_celcelery.py├── celery_task# celery相關資料夾
│ ├── celery.py # celery連線和配置相關檔案,必須叫這個名字
│ └── tasks1.py # 所有任務函式
│└── tasks2.py # 所有任務函式
├── check_result.py # 檢查結果
└── send_task.py # 觸發任務
from celery import celerytasks1.pycel = celery('celery_demo',
broker='redis:',
backend='redis:',
# 包含以下兩個任務檔案,去相應的py檔案中找任務,對多個任務做分類
include=['celery_task.tasks1',
'celery_task.tasks2'])
# 時區
cel.conf.timezone = 'asia/shanghai'
# 是否使用utc
cel.conf.enable_utc = false
import timetasks2.pyfrom celery_task.celery import cel
@cel.task
def test_celery(res):
time.sleep(5)
return "test_celery任務結果:%s"%res
import timecheck_result.pyfrom celery_task.celery import cel
@cel.task
def test_celery2(res):
time.sleep(5)
return "test_celery2任務結果:%s"%res
send_task.py
from celery_task.tasks1 import test_celery新增任務(執行send_task.py),開啟work:celery worker -a celery_task -l info -p eventlet,檢查任務執行結果(執行check_result.py)from celery_task.tasks2 import test_celery2
# 立即告知celery去執行test_celery任務,並傳入乙個引數
result = test_celery.delay('第乙個的執行')
print(result.id)
result = test_celery2.delay('第二個的執行')
print(result.id)
設定時間讓celery執行乙個任務
add_task.py
類似於contab的定時任務
多工結構中celery.py修改如下
from datetime import timedelta啟動乙個beat:celery beat -a celery_task -l infofrom celery import celery
from celery.schedules import crontab
cel = celery('tasks', broker='redis:', backend='redis:', include=[
'celery_task.tasks1',
'celery_task.tasks2',
])cel.conf.timezone = 'asia/shanghai'
cel.conf.enable_utc = false
cel.conf.beat_schedule = ,
# 'add-every-12-seconds': ,
}
啟動work執行:celery worker -a celery_task -l info -p eventlet
安裝包
celery==3.1.25在專案目錄下建立celeryconfig.pydjango-celery==3.1.20
from celery import task檢視函式views.py@task
def add(a,b):
with open('a.text', 'a', encoding='utf-8') as f:
f.write('a')
print(a+b)
settings.py
分布式任務佇列Celery
celery 芹菜 是基於python開發的分布式任務佇列。它支援使用任務佇列的方式在分布的機器 程序 執行緒上執行任務排程。基本用法是在程式裡引用celery,並將函式方法繫結到task from celery import celery def add x,y return x y from t...
Celery分布式任務佇列
celery是一款非常簡單,靈活,可靠的分布式系統,可用於處理大量訊息,並且提供了一整套操作此系統的一系列工具 celery是一款訊息佇列工具,可用於處理實時資料以及任務排程 什麼是任務佇列?任務佇列一般用於執行緒或計算機之間分配工作的一種機制 任務佇列的輸入是乙個成為任務的工作單元,有專門的職稱 ...
關於Celery 分布式任務佇列
celery 是 distributed task queue,分布式任務佇列,分布式決定了可以有多個 worker 的存在,佇列表示其是非同步操作,即存在乙個產生任務提出需求的工頭,和一群等著被分配工作的碼農。在 python 中定義 celery 的時候,我們要引入 broker,中文翻譯過來就...