celery的實踐指南
celery原理:
celery實際上是實現了乙個典型的生產者-消費者模型的訊息處理/任務排程統,消費者(worker)和生產者(client)都可以有任意個,他們通過訊息系統(broker)來通訊。
典型的場景為:
客戶端啟動乙個程序(生產者),當使用者的某些操作耗時較長或者比較頻繁時,考慮接入本訊息系統,傳送乙個task任務給broker。
後台啟動乙個worker程序(消費者),當發現broker中儲存有某個任務到了該執行的時間,他就會拿過來,根據task型別和引數執行。
實踐中的典型場景:
執行celery的worker,讓他作為consumer執行,自動從broker上獲得任務並執行。`celery -a celery_demo worker`
執行celery的client,讓其根據schedule,自動生產出task msg,並發布到broker上。`celery -a celery_demo beat`
安裝並執行flower,方便監控task的執行狀態`celery flower -a celery_demo`
或者設定登入密碼 `
celery flower -a celery_demo --basic_auth=user1:password1,user2:password2
多同步任務-鏈式任務-
失敗自動重試的task失敗重試方法: 將task**函式引數增加self,同時繫結bind。
自動重試後,是否將任務重新入queue後排隊,還是等待指定的時間?可以通過self.retry()引數來指定。
派發到不同queue佇列的task乙個task自動對映到多個queue中的方法, 通過配置task和queue的routing_key命名模式。比如:把queue的exchange和routing_key配置成通用模式:
再定義task的routing_key的名稱:
可用的不同exchange策略:direct:直接根據定義routing_key
topic:exchange會根據萬用字元來將乙個訊息推送到多個queue。
fanout:將訊息拆分,分別推送到不同queue,通常用於超大任務,耗時任務。
參考:高階配置result是否儲存
失敗郵件通知:
關閉rate limit:
auto_reload方法(*nix系統):celery通過監控源**目錄的改動,自動地進行reload
使用方法:1.依賴inotify(linux) 2. kqueue(os x / bsd)
安裝依賴:
$ pip install pyinotify
(可選) 指定fsnotify的依賴:
$ env celeryd_fsnotify=stat celery worker -l info --autoreload
auto-scale方法:啟用auto-scale
臨時增加worker程序數量(增加consumer):
$ celery -a proj control add_consumer foo -d worker1.local
臨時減少worker程序數量(減少consumer):
啟動停止worker的方法:啟動 as daemon :
root使用者可以使用celerydom
e/ru
n/ce
lery
/'>home/run/celery/
home/run/celery/home/log/celery/%n.log"
或者 celery worker —detach
停止ps auxww | grep 'celery worker' | awk '' | xargs kill -9
與flask整合的方法整合後flask將充當producer來建立並傳送task給broker,在celery啟動的獨立worker程序將從broker中獲得task並執行,同時將結果返回。
flask獲得
與flask整合後的啟動問題由於celery的預設routing_key是根據生產者在**中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在專案頂級目錄上,否者會出現keyerror。
效能提公升: eventlet 和 greenlet
官方參考:
celery的實踐指南
celery原理:
celery實際上是實現了乙個典型的生產者-消費者模型的訊息處理/任務排程統,消費者(worker)和生產者(client)都可以有任意個,他們通過訊息系統(broker)來通訊。
典型的場景為:
客戶端啟動乙個程序(生產者),當使用者的某些操作耗時較長或者比較頻繁時,考慮接入本訊息系統,傳送乙個task任務給broker。
後台啟動乙個worker程序(消費者),當發現broker中儲存有某個任務到了該執行的時間,他就會拿過來,根據task型別和引數執行。
實踐中的典型場景:
執行celery的worker,讓他作為consumer執行,自動從broker上獲得任務並執行。`celery -a celery_demo worker`
執行celery的client,讓其根據schedule,自動生產出task msg,並發布到broker上。`celery -a celery_demo beat`
安裝並執行flower,方便監控task的執行狀態`celery flower -a celery_demo`
或者設定登入密碼 `
celery flower -a celery_demo --basic_auth=user1:password1,user2:password2
多同步任務-鏈式任務-
失敗自動重試的task失敗重試方法: 將task**函式引數增加self,同時繫結bind。
自動重試後,是否將任務重新入queue後排隊,還是等待指定的時間?可以通過self.retry()引數來指定。
派發到不同queue佇列的task乙個task自動對映到多個queue中的方法, 通過配置task和queue的routing_key命名模式。比如:把queue的exchange和routing_key配置成通用模式:
再定義task的routing_key的名稱:
可用的不同exchange策略:direct:直接根據定義routing_key
topic:exchange會根據萬用字元來將乙個訊息推送到多個queue。
fanout:將訊息拆分,分別推送到不同queue,通常用於超大任務,耗時任務。
參考:高階配置result是否儲存
失敗郵件通知:
關閉rate limit:
auto_reload方法(*nix系統):celery通過監控源**目錄的改動,自動地進行reload
使用方法:1.依賴inotify(linux) 2. kqueue(os x / bsd)
安裝依賴:
$ pip install pyinotify
(可選) 指定fsnotify的依賴:
$ env celeryd_fsnotify=stat celery worker -l info --autoreload
auto-scale方法:啟用auto-scale
臨時增加worker程序數量(增加consumer):
$ celery -a proj control add_consumer foo -d worker1.local
臨時減少worker程序數量(減少consumer):
啟動停止worker的方法:啟動 as daemon :
root使用者可以使用celerydom
e/ru
n/ce
lery
/'>home/run/celery/
home/run/celery/home/log/celery/%n.log"
或者 celery worker —detach
停止ps auxww | grep 'celery worker' | awk '' | xargs kill -9
與flask整合的方法整合後flask將充當producer來建立並傳送task給broker,在celery啟動的獨立worker程序將從broker中獲得task並執行,同時將結果返回。
flask獲得
與flask整合後的啟動問題由於celery的預設routing_key是根據生產者在**中的import級別來設定的,所以worker端在啟動時應該注意其啟動目錄應該在專案頂級目錄上,否者會出現keyerror。
效能提公升: eventlet 和 greenlet
官方參考:
Celery入門指南
個人理解celery分布式訊息佇列就是乙個生產者消費者模式,celery產生任務交給中間人broker 在這裡使用redis作為中間人 中間人將任務分發給眾多的worker來完成任務。看乙個簡單的專案 建立tasks.py from celery import celery def add x,y ...
celery 視覺化 Celery 最佳實踐
1.用好celery beat 如果你想更好的管理專案的定時任務,可以用celery beat代替crontab管理。celery不僅支援動態的非同步任務 通過delay呼叫 也支援定時任務執行。當然我們可以用crontab實現任務的定時執行,但是crontab是與專案 隔離的,為了更方便地管理定時...
LINQ TO SQLite實踐指南
前言 當前,軟體應用程式中,資料庫已經成為不可缺少的重要組成部分.然而傳統資料庫正趨向巨無霸化,對系統的要求一步步提高,管理成本也越來越大,對於中小型專案的應用,它的很多功能變得越來越多餘,但是我卻不得不為這些不需要的功能付出更多的資金和人力成本.在這些場景,嵌入式資料庫的輕量,零部署,跨平台,義移...