Celery的基本使用

2022-01-29 20:54:55 字數 4680 閱讀 9054

celery是乙個簡單、靈活且可靠的,處理大量訊息的分布式系統 專注於實時處理的非同步任務佇列 同時也支援任務排程

celery 官網:

celery 官方文件英文版:

celery 官方文件中文版:

ps:celery在windows系統上會出現不相容的情況
1. 可以不依賴任何伺服器,通過自身命令,啟動服務(內部支援socket)

2. celery服務為為其他專案服務提供非同步解決任務需求的

注:會有兩個服務同時執行,乙個是專案服務,乙個是celery服務,專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求

人是乙個獨立執行的服務 | 醫院也是乙個獨立執行的服務

正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題

人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求

celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,rabbitmq, redis等等
worker是celery提供的任務執行的單元,worker併發的執行在分布式的系統節點中。
task result store用來儲存worker執行的任務的結果,celery支援以不同方式儲存任務的結果,包括amqp, redis等
延遲執行:解決延遲任務

定時執行:解決週期(週期)任務,比如每天資料統計

pip install celery
訊息中介軟體:rabbitmq/redis

# 如果 celery物件:celery(...) 是放在乙個模組下的

# 1)終端切換到該模組所在資料夾位置:scripts

# 2)執行啟動worker的命令:celery worker -a 模組名 -l info -p eventlet

# 注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 模組名 -l info

# 注:模組名隨意

# 如果 celery物件:celery(...) 是放在乙個包下的

# 1)必須在這個包下建乙個celery.py的檔案,將celery(...)產生物件的語句放在該檔案中

# 2)執行啟動worker的命令:celery worker -a 包名 -l info -p eventlet

# 注:windows系統需要eventlet支援,linux與macos直接執行:celery worker -a 模組名 -l info

# 注:包名隨意

traceback (most recent call last):

result = (true, prepare_result(fun(*args, **kwargs)))

tasks, accept, hostname = _loc

valueerror: not enough values to unpack (expected 3, got 0)

安裝 pip install eventlet

重新輸入如下名命令

# celery_task是包名,包下必須要有乙個叫celery的檔案

celery worker -a celery_task -l info -p eventlet

worker執行頁面
import celery

# broker儲存的位置

broker = 'redis:'

# backend儲存的位置

backend ='redis:'

# 例項化的celery物件

# 需要新增的任務

def add(x,y):

print(x*y)

return x+y

broker提交任務的頁面
from celery_test import add

# 執行這個檔案,就是把這個任務新增到資料庫中,只要worker在工作

# 就會把這個任務從資料庫1中拿出來執行,並把結果放到資料庫2中

ret = add.delay(3,4)

# ret 是這個任務的uuid,用於獲取任務結果

backend獲取任務結果的頁面from celery.result import asyncresult

# 任務物件的唯一標識:uuid

id = '19dc2faa-39f9-47b6-af77-e9d3a4d05d2e'

if __name__ == '__main__':

if async1.successful():

result = async1.get()

print(result)

elif async1.failed():

print('任務失敗')

elif async1.status == 'pending':

print('任務等待中被執行')

elif async1.status == 'retry':

print('任務異常後正在重試')

elif async1.status == 'started':

print('任務已經開始被執行')

celery.py

import celery

broker = 'redis:'

backend ='redis:'

編寫需要新增的任務也在這個包內,可建立不同的任務檔案,可新增多個

task1.py

def add(x,y):

print(x,y)

return x+y

新增任務頁面

# 執行延遲任務就是多個乙個時間引數

# 這裡注意,時間引數是根據utc時間,並不是中國時間

from datetime import datetime, timedelta

# 時間物件必須和時間物件相加

eta=datetime.utcnow() + timedelta(seconds=10)

celery頁面
# 時區

# 是否使用utc

# 任務的定時配置

from datetime import timedelta

from celery.schedules import crontab

'low-task':

}# 定時任務的新增必須要新啟動乙個beat命令去工作

# celery beat -a celery_task -l info

# 載入django配置環境

import os

os.environ.setdefault('django_settings_module', 'luffyapi.settings.dev')

# 例項化celery,獲取worker物件,include新增可處理的任務函式

from celery import celery

broker = 'redis:' # 資料儲存位置

backend = 'redis:' # 返回值儲存位置

# 時區

# 是否使用utc時間

# 任務的定時配置

from datetime import timedelta

from celery.schedules import crontab

'update-banner-list':

}

from django.core.cache import cache

from django.conf import settings

from home import serializer, models

def update_banner_list():

queryset = models.banner.objects.filter(is_delete=false, is_show=true).order_by('-display_order')[:settings.banner_count]

ser = serializer.bannermodelserializer(instance=queryset, many=true)

banner_list = ser.data

for banner in banner_list:

banner['banner_url'] = '' % banner['banner_url']

cache.set('banner_list', banner_list, 60*60*24)

return true

celery配置與基本使用

定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果儲存位置 mycelery broker redis 任務存放的地方 backend redis 結果存放的地方 defadd x,y return x y 1.啟動celery 1.1 單程序啟動celery celery a...

celery配置與基本使用

定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果儲存位置 mycelery broker redis 任務存放的地方 backend redis 結果存放的地方 defadd x,y return x y 測試時一般使用併發數高的 1.啟動celery 1.1 單程序啟動ce...

Celery配置與基本使用

1.1安裝celery pip install celery 1.2新建celery main.py配置celery celery task main.py import osfrom celery import celery 定義celery例項,需要的引數,1,例項名,2,任務發布位置,3,結果...