Celery的任務分發與定時任務

2022-06-27 06:06:10 字數 4948 閱讀 2207

目錄1.3 django中應用celery

1.4 celery定時執行

1.5 週期性定時任務

1.6 任務繫結,記錄日誌,重試

1.7 啟用任務監控

celery面試總結

celery是由python開發的乙個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支援實時處理,也支援任務排程。

支援多個broker和worker來實現高可用和分布式。

將一些耗時的任務 扔到broker佇列中,並且會返回乙個任務id,可以通過任務id去backend佇列中獲取結果。 worker從broker獲取任務去執行,並將結果返回到backend佇列中。

函式名、引數 傳入broker

1.1 環境的搭建
pip3 install celery==4.4

安裝broker: redis或rabbitmq

pip3 install redis / pika

1.2 快速使用

任務超時限制

避免某些任務一直處於非正常的進行中狀態,阻塞佇列中的其他任務。應該為任務執行設定超時時間。如果任務超時未完成,則會將 worker 殺死,並啟動新的 worker 來替代。

執行程式:

啟動redis

啟動worker

# 首先要進入當前目錄

celery worker -a s1 -l info

# -a s1 找到專案

# -l info 是列印日誌log,**上線時不加info

windows下會報乙個錯:

traceback (most recent call last):

file "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop

result = (true, prepare_result(fun(*args, **kwargs)))

tasks, accept, hostname = _loc

valueerror: not enough values to unpack (expected 3, got 0)

解決安裝:

pip install eventlet

celery worker -a s1 -l info -p eventlet

建立任務,放入broker

python s2.py

python s2.py

檢視任務狀態

# 在s3.py填寫任務id

ptyhon s3.py

取消任務

celery_control.revoke(id, terminate=true)

1.3 django中應用celery

在django中用django-celery。

# pip3 install django-celery (沒有用到,還是用的celery模組)
之後,需要按照django-celery的要求進行編寫**。

啟動django程式

python manage.py ....
1.4 celery定時執行

from celery.result import asyncresult

def time_task(request):

"""定時執行

:param request:

:return:

"""# 獲取本地時間

ctime = datetime.datetime.now()

# 轉換成utc時間

utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())

s10 = datetime.timedelta(seconds=60) # 60s後執行

ctime_x = utc_ctime + s10 # 執行的時間

# print(result_object.status) # 獲取狀態

# data = result_object.get() # 獲取資料

# result_object.forget() # 把資料在backend中移除

# result_object.revoke(terminate=true) # 取消任務terminate=true強制取消

# 通過狀態絕對返回方式

if result_object.successful():

# 成功

data = result_object.get()

result_object.forget()

elif result_object.failed():

# 失敗

data = '執行失敗!'

else:

data = '執行中!'

支援的引數 :

1.5 週期性定時任務

task與shared_task裝飾器的區別:

task是通過建立的celery物件進行呼叫

例如:

def x1(x, y):

return x - y

def x2(x, y):

return x * y

多用於單一檔案中,不用直接載入到記憶體。當有多個celery物件時,任務函式可以明確使用某乙個來裝飾。

shared_task多用與多個檔案使用celery,一般在celery.py中只建立乙個celery物件,例如django整合celery。在專案啟動時,會將celery物件載入到記憶體,而@shared_task會自動將寫在各個應用下task.py的任務交與記憶體中的celery物件,復用性較強。

例如: @shared_task

def x1(x, y):

return x - y

from web import tasks

tasks.x1.delay(1,5)

但當在celery.py中建立多個celery物件時,不同任務使用不同的物件,這個時候就需要指明物件名。

例如: from web import tasks

1.6 任務繫結,記錄日誌,重試
# 修改 tasks.py 檔案.

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

def div(self, x, y):

logger.info(('executing task id , args: '

'kwargs: ').format(self.request))

try:

result = x/y

except zerodivisionerror as e:

raise self.retry(exc=e, countdown=5, max_retries=3) # 發生 zerodivisionerror 錯誤時, 每 5s 重試一次, 最多重試 3 次.

return result

當使用bind=true引數之後, 函式的引數發生變化, 多出了引數 self, 這這相當於把 div 程式設計了乙個已繫結的方法, 通過 self 可以獲得任務的上下文.

1.7 啟用任務監控

相比檢視日誌,flower 的 web 介面會顯得更加友好。

flower 的 supervisor 管理配置檔案:

[program:flower]

command=/opt/pyprojects/venv/bin/flower -a celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555

autostart=true

autorestart=true

startretries=3

user=derby

stdout_logfile=/var/logs/%(program_name)s.log

stdout_logfile_maxbytes=50mb

stdout_logfile_backups=30

stderr_logfile=/var/logs/%(program_name)s-error.log

stderr_logfile_maxbytes=50mb

stderr_logfile_backups=3

1. celery是乙個由python開發的乙個簡單、靈活、可靠的,能夠處理大量任務的系統,可以做任務的分發,也能夠做定時任務。多用於耗時的操作。例如傳送簡訊、郵箱這些功能就能使用celery做任務分發。

2. @shared_task/@task裝飾的函式說明是這乙個celery任務,會新增broker中。

3. 函式名.delay(引數),會去呼叫且執行任務,並且返回任務id。

6. revoke()可以取消任務。

Celery 定時任務

project celery task celery包 init py 包檔案 celery.py celery連線和配置相關檔案,且名字必須交celery.py tasks.py 任務體函式檔案 注意 任務體 檔案要與celery.py檔案在同乙個包下 beat也是乙個socket,啟動後會根據配...

Celery 定時任務

celery beat是乙個排程程式,它定期啟動任務,然後由集群中的可用工作程式節點執行任務。預設情況下,條目是從 beat schedule 設定中獲取的 但也可以使用自定義儲存,例如將條目儲存在sql資料庫中。必須確保一次只有乙個排程程式針對乙個排程任務執行,否則最終將導致重複的任務。使用集中式...

celery定時任務

預先在django中配置好celery,接下來著手配置下定時任務。時區修改為本地時區 在setttings.py 新增以下任意一行 celery timezone asia shanghai 如果使用了django celery beat持久化到資料庫,那麼需要手動執行更新命令 python man...