起因
最近打算實現非同步任務,回想起當年看celery的場景,重新整理下celery的機制
1. 任務入佇列
假定乙個函式定義如下
def
add(a, b, c=0):
print a + b + c
任務被序列化後,以字串的形式入佇列
, "delivery_mode": 2
, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"
}, "content-encoding": "binary"
}
展開
,
, "properties": ,
"delivery_mode": 2
, "delivery_tag": "2e1bc567-980d-46a0-94d4-d9ad030973d3"
}, "content-encoding": "binary"
}
body 中儲存有task需要執行的所有資訊,預設情況下, 它的編碼方式是
dict –> pickle 編碼 –> base64編碼 –> 字串
解碼後的body 形如
,
'eta': none,
'id': '06f331d5-aae9-4ef7-945f-3a73758e6b62'
}
其中重要的字段是
task
u』tasks.add』是函式的全路徑,包含包路徑
args
[15, 20]函式引數
kwargs
函式引數
2. 任務執行
函式的執行可以直接簡化成
def
add(a, b, c=0):
print a + b + c
globals()['add'](*[15, 20], **)
3. 後記
當然celery本身比這複雜的多,任務編碼的方式可以自己指定,worker要能執行某個任務,任務的資訊是需要提前註冊的。
celery動態任務元件Demo以及原理
celery是乙個基於python的分布式排程系統,文件在這 最近有個需求,想要動態的新增任務而不用重啟celery服務,找了一圈沒找到什麼好辦法 也有可能是文件沒看仔細 所以只能自己實現囉 為celery動態新增任務,首先我想到的是傳遞乙個函式進去,讓某個特定任務去執行這個傳遞過去的函式,就像這樣...
Celery的基本使用,以及完整非同步任務的執行
1.非同步任務celery和中間人rabbitmq 1.1 celery提供非同步任務,遇到耗時操作的任務都可以交給celery來完成 1.2 通訊過程是,生產者 任務發布者 訊息佇列 broker rabbitmq或者是redis 消費者 任務執行人 生產者把任務快取在訊息佇列中,消費者從任務佇列...
非同步任務佇列Celery在Django中的使用
前段時間在django web平台開發中,碰到一些請求執行的任務時間較長 幾分鐘 為了加快使用者的響應時間,因此決定採用非同步任務的方式在後台執行這些任務。在同事的指引下接觸了celery這個非同步任務佇列框架,鑑於網上關於celery和django結合的文件較少,大部分也只是粗粗介紹了大概的流程,...