目錄1.3 django中應用celery
1.4 celery定時執行
1.5 週期性定時任務
1.6 任務繫結,記錄日誌,重試
1.7 啟用任務監控
celery面試總結
celery是由python開發的乙個簡單、靈活、可靠的處理大量任務的分發系統,它不僅支援實時處理,也支援任務排程。
支援多個broker和worker來實現高可用和分布式。
將一些耗時的任務 扔到broker佇列中,並且會返回乙個任務id,可以通過任務id去backend佇列中獲取結果。 worker從broker獲取任務去執行,並將結果返回到backend佇列中。
函式名、引數 傳入broker
1.1 環境的搭建pip3 install celery==4.4
安裝broker: redis或rabbitmq
pip3 install redis / pika
1.2 快速使用
任務超時限制
避免某些任務一直處於非正常的進行中狀態,阻塞佇列中的其他任務。應該為任務執行設定超時時間。如果任務超時未完成,則會將 worker 殺死,並啟動新的 worker 來替代。
執行程式:
啟動redis
啟動worker
# 首先要進入當前目錄
celery worker -a s1 -l info
# -a s1 找到專案
# -l info 是列印日誌log,**上線時不加info
windows下會報乙個錯:
traceback (most recent call last):
file "d:\wupeiqi\py_virtual_envs\auction\lib\site-packages\billiard\pool.py", line 362, in workloop
result = (true, prepare_result(fun(*args, **kwargs)))
tasks, accept, hostname = _loc
valueerror: not enough values to unpack (expected 3, got 0)
解決安裝:
pip install eventlet
celery worker -a s1 -l info -p eventlet
建立任務,放入broker
python s2.py
python s2.py
檢視任務狀態
# 在s3.py填寫任務id
ptyhon s3.py
取消任務
celery_control.revoke(id, terminate=true)
1.3 django中應用celery
在django中用django-celery。
# pip3 install django-celery (沒有用到,還是用的celery模組)
之後,需要按照django-celery的要求進行編寫**。
啟動django程式
python manage.py ....
1.4 celery定時執行
from celery.result import asyncresult
def time_task(request):
"""定時執行
:param request:
:return:
"""# 獲取本地時間
ctime = datetime.datetime.now()
# 轉換成utc時間
utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=60) # 60s後執行
ctime_x = utc_ctime + s10 # 執行的時間
# print(result_object.status) # 獲取狀態
# data = result_object.get() # 獲取資料
# result_object.forget() # 把資料在backend中移除
# result_object.revoke(terminate=true) # 取消任務terminate=true強制取消
# 通過狀態絕對返回方式
if result_object.successful():
# 成功
data = result_object.get()
result_object.forget()
elif result_object.failed():
# 失敗
data = '執行失敗!'
else:
data = '執行中!'
支援的引數 :
1.5 週期性定時任務
task與shared_task裝飾器的區別:
task是通過建立的celery物件進行呼叫
例如:
def x1(x, y):
return x - y
def x2(x, y):
return x * y
多用於單一檔案中,不用直接載入到記憶體。當有多個celery物件時,任務函式可以明確使用某乙個來裝飾。
shared_task多用與多個檔案使用celery,一般在celery.py中只建立乙個celery物件,例如django整合celery。在專案啟動時,會將celery物件載入到記憶體,而@shared_task會自動將寫在各個應用下task.py的任務交與記憶體中的celery物件,復用性較強。
例如: @shared_task
def x1(x, y):
return x - y
from web import tasks
tasks.x1.delay(1,5)
但當在celery.py中建立多個celery物件時,不同任務使用不同的物件,這個時候就需要指明物件名。
例如: from web import tasks
1.6 任務繫結,記錄日誌,重試# 修改 tasks.py 檔案.
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
def div(self, x, y):
logger.info(('executing task id , args: '
'kwargs: ').format(self.request))
try:
result = x/y
except zerodivisionerror as e:
raise self.retry(exc=e, countdown=5, max_retries=3) # 發生 zerodivisionerror 錯誤時, 每 5s 重試一次, 最多重試 3 次.
return result
當使用bind=true
引數之後, 函式的引數發生變化, 多出了引數 self, 這這相當於把 div 程式設計了乙個已繫結的方法, 通過 self 可以獲得任務的上下文.
1.7 啟用任務監控
相比檢視日誌,flower 的 web 介面會顯得更加友好。
flower 的 supervisor 管理配置檔案:
[program:flower]
command=/opt/pyprojects/venv/bin/flower -a celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555
autostart=true
autorestart=true
startretries=3
user=derby
stdout_logfile=/var/logs/%(program_name)s.log
stdout_logfile_maxbytes=50mb
stdout_logfile_backups=30
stderr_logfile=/var/logs/%(program_name)s-error.log
stderr_logfile_maxbytes=50mb
stderr_logfile_backups=3
1. celery是乙個由python開發的乙個簡單、靈活、可靠的,能夠處理大量任務的系統,可以做任務的分發,也能夠做定時任務。多用於耗時的操作。例如傳送簡訊、郵箱這些功能就能使用celery做任務分發。
2. @shared_task/@task裝飾的函式說明是這乙個celery任務,會新增broker中。
3. 函式名.delay(引數),會去呼叫且執行任務,並且返回任務id。
6. revoke()可以取消任務。
Celery 定時任務
project celery task celery包 init py 包檔案 celery.py celery連線和配置相關檔案,且名字必須交celery.py tasks.py 任務體函式檔案 注意 任務體 檔案要與celery.py檔案在同乙個包下 beat也是乙個socket,啟動後會根據配...
Celery 定時任務
celery beat是乙個排程程式,它定期啟動任務,然後由集群中的可用工作程式節點執行任務。預設情況下,條目是從 beat schedule 設定中獲取的 但也可以使用自定義儲存,例如將條目儲存在sql資料庫中。必須確保一次只有乙個排程程式針對乙個排程任務執行,否則最終將導致重複的任務。使用集中式...
celery定時任務
預先在django中配置好celery,接下來著手配置下定時任務。時區修改為本地時區 在setttings.py 新增以下任意一行 celery timezone asia shanghai 如果使用了django celery beat持久化到資料庫,那麼需要手動執行更新命令 python man...