celery 是 python 中的常用的任務佇列框架,經常用於非同步呼叫、後台任務等工作。celery 本身以 python 寫,但協議可在不同的語言中實現,其它語言也可以用 celery 執行相應的任務。在 web 應用,為提高系統響應速度,傳送郵件、資料整理等需要長時間執行的任務,通常以非同步任務的方式執行,這時就需要用到像 celery 類的框架。另一種常見的場景是大型系統的分布式處理,為了提公升系統效能,各個元件通常以多個例項執行不同主機上,而元件之間的呼叫就需要用到 celery 這樣的框架。使用 celery (或訊息佇列),有助於降低系統元件之間的耦合,有助於實現灰度發布、實現服務的分布式、實現水平擴充套件,最終提公升系統健壯性和處理效能。
celery (和類似框架)的核心是任務佇列。使用者發起任務,celery 負責把任務排隊和整理,然後交到任務執行器 worker 中。 worker 監視任務佇列,獲取新任務並執行。在 celery 內部,以訊息機制協調各個元件工作,訊息需要借助乙個中間人 broker 進行,如下 ::
client → celery task → broker → celery worker
↑ ↓
← ← ← ← result backend
client 發起任務時,一般是以非同步方式(除非必要的同步 rpc ),獲得乙個任務的 id 並儲存下來,後續可通過 id 到 result backend 中查詢任務執行結果。broker 是第三方元件,可使用訊息佇列( rabbitmq 等)、redis、資料庫等,只要能實現訊息的儲存和分發理論上都能使用。 worker 以執行緒或程序的形式執行,從 broker 中取任務執行,然後把結果儲存到 result backend 。
目前 rabbitmq 的 broker 實現的功能最完備,在開發環境中也可以使用 sqlite 等比較方便的方式,但效能會很差,不能用在生產環境上。
另外需要注意的是,由於不同作業系統的程序模型的差異,celery 會在 windows 上產生一些配置方面的怪異問題。
celery 可直接通過 pip 安裝,在 virtualenv 下,直接執行 ::
pip install celery
再安裝 broker 所需要的驅動,例如使用 rabbitmq ,則安裝 ::
pip install amqp
同時安裝好 rabbitmq (建議通過 docker 安裝,使用 rabbitmq:management 映象,可在 15672 埠檢視管理控制台)。
然後使用下面的**示例(摘錄來自: ask solem. 「celery manual, version 3.1「) ::
# hello.py
from celery import celery
def hello():
return 'hello world'
if __name__ == '__main__':
r = hello.delay()
然後,啟動 worker ::
celery -a hello worker --loglevel=info
client 執行任務 ::
python hello.py
上面的簡單例子是沒有引數的,如果增加引數,如下 ::
# add.py
from celery import celery
backend='db+sqlite:///celery_result.db')
def add(x, y):
return x+y
if __name__ == '__main__':
r = add.delay(1, 2)
print(r.wait())
啟動 worker ::
celery -a add worker --l info
呼叫 ::
python add.py
當任務結果用 amqp 儲存時,結果只能取一次, 因此無法在後續呼叫中查詢任務結果。這個例子用 sqlite 儲存了任務執行結果,因此 client 可在 r.wait() 查詢任務的結果、任務的狀態等等很多資訊,可把 r.id 儲存到資料庫,然後未來查詢任務的 asyncresult ::
print(r2.wait())
print(r2.successful())
add.py 中使用了兩個引數 x y ,而 celery 需要通過 broker 傳遞這兩個引數,這時需要對資料進行序列化,將 x y 物件轉換為無結構的資料,然後 worker 接收到後再把資料還原為 x y 物件。 celery 內建的序列化方法包括 pickle 、 json 等等,如果物件比較複雜,需要自己定義序列化方法。
如果不想立即執行任務,而是把任務傳遞到其它地方,通過 celery 的 subtask 支援。 subtask 是對 task 的呼叫引數和執行選項的乙個封裝,如 ::
add.subtask((2,2), countdown=10)
add.s(2,2)
subtask 或 s 返回的是乙個 task 的簽名(celery.canvas.signature),它可實現工作流、偏函式等效果。subtask 支援和 task 同樣的呼叫方法,如 ::
s = add.s(2) # subtask ,partial
s.delay(2) # 傳送訊息開始非同步執行
在 celery 工作流中組織 subtask 的方式有 group / chain / chord 等等, group 中任務併發執行,chain 中任務順序執行,chord 中進行**。而這些組織方式本身也是 subtask ,可巢狀使用 ::
# workflow.py
from celery import celery, group, chain
backend='db+sqlite:///celery_result.db')
def add(x, y):
return x+y
if __name__ == '__main__':
g = group((add.s(i, i) for i in range(10)))
r = g.delay()
print(r.get())
c = chain(add.s(1, 2) | add.s(3))
r2 = c.delay()
print(r2.get())
celery 的任務呼叫通過網路傳送任務的名字和引數,不傳送任務**, worker 收到任務後根據任務名和引數執行相應的**。因此不同 worker 中的**版本不一樣時,會有不同的處理結果。如果 worker 中不能處理相應的任務名,就會報錯。 Kali Linux學習01(介紹)
kali linux是乙個高階滲透測試和安全審計linux發行版。作為使用者,我簡單的把它理解為,乙個特殊的linux發行版,整合了精心挑選的滲透測試和安全審計的工具,供滲透測試和安全設計人員使用。也可稱之為平台或者框架。作為linux發行版,kali linux是在backtrack linux的...
非同步任務利器Celery 一 介紹
django專案開發中遇到過一些問題,傳送請求後伺服器要進行一系列耗時非常長的操作,使用者要等待很久的時間。可不可以立刻對使用者返回響應,然後在後台執行那些操作呢?crontab定時任務很難達到這樣的要求 非同步任務是很好的解決方法,有乙個使用python寫的非常好用的非同步任務工具celery。c...
非同步任務利器Celery 一 介紹
django專案開發中遇到過一些問題,傳送請求後伺服器要進行一系列耗時非常長的操作,使用者要等待很久的時間。可不可以立刻對使用者返回響應,然後在後台執行那些操作呢?crontab定時任務很難達到這樣的要求 非同步任務是很好的解決方法,有乙個使用python寫的非常好用的非同步任務工具celery。c...