celery是乙個簡單、靈活且可靠的,處理大量訊息的分布式系統 專注於實時處理的非同步任務佇列 同時也支援任務排程ps:celery在windows系統上會出現不相容的情況celery 官網:
celery 官方文件英文版:
celery 官方文件中文版:
1. 可以不依賴任何伺服器,通過自身命令,啟動服務(內部支援socket)
2. celery服務為為其他專案服務提供非同步解決任務需求的
注:會有兩個服務同時執行,乙個是專案服務,乙個是celery服務,專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求
人是乙個獨立執行的服務 | 醫院也是乙個獨立執行的服務
正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求
celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,rabbitmq, redis等等
worker是celery提供的任務執行的單元,worker併發的執行在分布式的系統節點中。
task result store用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括amqp, redis等
延遲執行:解決延遲任務
定時執行:解決週期(週期)任務,比如每天資料統計
pip install celery
訊息中介軟體:rabbitmq/redis
# 如果 celery物件:celery(...) 是放在乙個模組下的
# 1)終端切換到該模組所在資料夾位置:scripts
# 2)執行啟動worker的命令:celery worker -a 模組名 -l info -p eventlet
# 注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 模組名 -l info
# 注:模組名隨意
# 如果 celery物件:celery(...) 是放在乙個包下的
# 1)必須在這個包下建乙個celery.py的檔案,將celery(...)產生物件的語句放在該檔案中
# 2)執行啟動worker的命令:celery worker -a 包名 -l info -p eventlet
# 注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 模組名 -l info
# 注:包名隨意
traceback (most recent call last):
result = (true, prepare_result(fun(*args, **kwargs)))
tasks, accept, hostname = _loc
valueerror: not enough values to unpack (expected 3, got 0)
安裝 pip install eventlet
重新輸入如下名命令
# celery_task是包名,包下必須要有乙個叫celery的檔案
celery worker -a celery_task -l info -p eventlet
worker執行頁面import celery
# broker儲存的位置
broker = 'redis:'
# backend儲存的位置
backend ='redis:'
# 例項化的celery物件
# 需要新增的任務
def add(x,y):
print(x*y)
return x+y
broker提交任務的頁面from celery_test import add
# 執行這個檔案,就是把這個任務新增到資料庫中,只要worker在工作
# 就會把這個任務從資料庫1中拿出來執行,並把結果放到資料庫2中
ret = add.delay(3,4)
# ret 是這個任務的uuid,用於獲取任務結果
backend獲取任務結果的頁面from celery.result import asyncresult
# 任務物件的唯一標識:uuid
id = '19dc2faa-39f9-47b6-af77-e9d3a4d05d2e'
if __name__ == '__main__':
if async1.successful():
result = async1.get()
print(result)
elif async1.failed():
print('任務失敗')
elif async1.status == 'pending':
print('任務等待中被執行')
elif async1.status == 'retry':
print('任務異常後正在重試')
elif async1.status == 'started':
print('任務已經開始被執行')
celery.py
import celery
broker = 'redis:'
backend ='redis:'
編寫需要新增的任務也在這個包內,可建立不同的任務檔案,可新增多個
task1.py
def add(x,y):
print(x,y)
return x+y
新增任務頁面
# 執行延遲任務就是多個乙個時間引數
# 這裡注意,時間引數是根據utc時間,並不是中國時間
from datetime import datetime, timedelta
# 時間物件必須和時間物件相加
eta=datetime.utcnow() + timedelta(seconds=10)
celery頁面# 時區
# 是否使用utc
# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab
'low-task':
}# 定時任務的新增必須要新啟動乙個beat命令去工作
# celery beat -a celery_task -l info
# 載入django配置環境
import os
os.environ.setdefault('django_settings_module', 'luffyapi.settings.dev')
# 例項化celery,獲取worker物件,include新增可處理的任務函式
from celery import celery
broker = 'redis:' # 資料儲存位置
backend = 'redis:' # 返回值儲存位置
# 時區
# 是否使用utc時間
# 任務的定時配置
from datetime import timedelta
from celery.schedules import crontab
'update-banner-list':
}
from django.core.cache import cache
from django.conf import settings
from home import serializer, models
def update_banner_list():
queryset = models.banner.objects.filter(is_delete=false, is_show=true).order_by('-display_order')[:settings.banner_count]
ser = serializer.bannermodelserializer(instance=queryset, many=true)
banner_list = ser.data
for banner in banner_list:
banner['banner_url'] = '' % banner['banner_url']
cache.set('banner_list', banner_list, 60*60*24)
return true
celery配置與基本使用
定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果儲存位置 mycelery broker redis 任務存放的地方 backend redis 結果存放的地方 defadd x,y return x y 1.啟動celery 1.1 單程序啟動celery celery a...
celery配置與基本使用
定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果儲存位置 mycelery broker redis 任務存放的地方 backend redis 結果存放的地方 defadd x,y return x y 測試時一般使用併發數高的 1.啟動celery 1.1 單程序啟動ce...
Celery配置與基本使用
1.1安裝celery pip install celery 1.2新建celery main.py配置celery celery task main.py import osfrom celery import celery 定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果...