celery是乙個簡單,靈活且可靠的分布式系統,可以處理大量訊息,同時為操作提供維護該系統所需的工具。
這是乙個任務佇列,著重於實時處理,同時還支援任務排程。
celery通過訊息進行通訊,通常使用**在客戶端和工作人員之間進行調解。為了啟動任務,客戶端將訊息新增到佇列中,然後**將訊息傳遞給工作人員
celery_test工作目錄結構.
├── __init__.py
├── __pycache__
│ ├── __init__.cpython-37.pyc
│ ├── celery.cpython-37.pyc
│ ├── celeryconfig.cpython-37.pyc
│ └── tasks.cpython-37.pyc
├── celery.py #主程式
├── celeryconfig.py #配置檔案
└── tasks.py # 任務模組
celery.py
#
# python3開始,import 預設只做absolute import
# from __future__ import absolute_import
from celery import celery
from celery_test import celeryconfig
# 例項化celery物件,任務排程利器(分布式任務排程模組)
# 2.
if __name__ == '__main__':
celeryconfig.py
# celery本身不含訊息服務,它使用第三方訊息服務來傳遞任務
# 選redis作為訊息服務(broker),celery預設使用的是rabbitmq
# 也可以通過 -b 選項在命令列進行設定其他的中間人(broker)
broker_url = 'redis:'
# 儲存結果:如果想跟蹤任務狀態,celery需要儲存任務狀態資訊.
# celery 內建了一些後端結果:sqlalchemy/django orm、memcached、redis、 rpc (rabbitmq/amqp)以及自定義的後端結果儲存中介軟體
backend=''
# 指定序列化方式
task_serializer = 'json'
# 指定區時間
tiemzone = 'asia/shanghai'
# 匯入任務模組列表
include=['celery_test.tasks',]
tasks.py
# 裝飾器從所有可呼叫物件中建立任務類
def add(x,y):
return x+y
def mul(x,y):
return x*y
啟動celery worker
呼叫celery中的任務
>>> from celery_test.tasks import add,mul
# delay : returns: celery.result.asyncresult: future promise(非同步執行)
>>> res=add.delay(2,3)
>>> res
celery端會顯示
[2020-04-10 15:57:26,200: info/forkpoolworker-2]
task celery_test.tasks.add[fe2a3e01-b285-4f68-bd76-cdb5d4f3d15f] succeeded in 0.0014597050000002554s: 8
啟動celery worker 的debug模式
celery -a celery_test worker -l debug
呼叫celery中的任務–>celery端會顯示的更詳細:
分布式 分布式鎖
本質是利用redis的setnx 方法的特性來加鎖,setnx 即key不存在則設定key,否則直接返回false,要求在分布式系統中使用同乙個redis服務,以下提供兩種解決方案 1 直接使用redistemplate 這其實並不能完全保證高併發下的安全問題,因為可能在鎖過期之後該執行緒尚未執行完...
分布式 分布式事務
是資料庫執行過程中的乙個邏輯單位,由乙個有限的資料庫操作序列構成。事務的acid四大特性 原子性 atomicity 事務作為乙個整體被執行。一致性 consistency 從乙個一致的狀態轉換到另乙個一致的狀態。隔離性 isolation 多個事務併發執行時,併發事務之間互相影響的程度。永續性 d...
分布式之分布式事務
被人問到分布式事務,之前學rabbitmq 的時候學到過rabbitmq 高階的事務,因為沒有用過,所有沒有回答好。這裡總結一下。1.單機版事務。事務的四大特性 acid a.原子性 b.一致性 c.隔離性 d.永續性 單機事務可以通過設定事務的隔離級別 參見spring 的事務隔離級別 2.分布式...