celery 中任務的結構以及執行

2021-07-02 14:03:01 字數 1191 閱讀 7851

起因

最近打算實現非同步任務,回想起當年看celery的場景,重新整理下celery的機制

1. 任務入佇列

假定乙個函式定義如下

def

add(a, b, c=0):

print a + b + c

任務被序列化後,以字串的形式入佇列

, "delivery_mode": 2

, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"

}, "content-encoding": "binary"

}

展開

,

, "properties": ,

"delivery_mode": 2

, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"

}, "content-encoding": "binary"

}

body 中儲存有task需要執行的所有資訊,預設情況下, 它的編碼方式是

dict –> pickle 編碼 –> base64編碼 –> 字串

解碼後的body 形如

,

'eta': none,

'id': '06f331d5-aae9-4ef7-945f-3a73758e6b62'

}

其中重要的字段是

tasku』tasks.add』是函式的全路徑,包含包路徑

args[15, 20]函式引數

kwargs函式引數

2. 任務執行

函式的執行可以直接簡化成

def

add(a, b, c=0):

print a + b + c

globals()['add'](*[15, 20], **)

3. 後記

當然celery本身比這複雜的多,任務編碼的方式可以自己指定,worker要能執行某個任務,任務的資訊是需要提前註冊的。

celery動態任務元件Demo以及原理

celery是乙個基於python的分布式排程系統,文件在這 最近有個需求,想要動態的新增任務而不用重啟celery服務,找了一圈沒找到什麼好辦法 也有可能是文件沒看仔細 所以只能自己實現囉 為celery動態新增任務,首先我想到的是傳遞乙個函式進去,讓某個特定任務去執行這個傳遞過去的函式,就像這樣...

Celery的基本使用,以及完整非同步任務的執行

1.非同步任務celery和中間人rabbitmq 1.1 celery提供非同步任務,遇到耗時操作的任務都可以交給celery來完成 1.2 通訊過程是,生產者 任務發布者 訊息佇列 broker rabbitmq或者是redis 消費者 任務執行人 生產者把任務快取在訊息佇列中,消費者從任務佇列...

非同步任務佇列Celery在Django中的使用

前段時間在django web平台開發中,碰到一些請求執行的任務時間較長 幾分鐘 為了加快使用者的響應時間,因此決定採用非同步任務的方式在後台執行這些任務。在同事的指引下接觸了celery這個非同步任務佇列框架,鑑於網上關於celery和django結合的文件較少,大部分也只是粗粗介紹了大概的流程,...