Django分布式任務佇列celery的實踐

2021-08-28 21:37:38 字數 3983 閱讀 7114

broker 的選擇大致有訊息佇列和資料庫兩種,這裡建議盡量避免使用資料庫作為 broker,除非你的業務系統足夠簡單。在併發量很高的複雜系統中,大量 workers 訪問資料庫的行為會使得作業系統磁碟 i/o 一直處於高峰值狀態,非常影響系統效能。如果資料庫 broker 同時還兼顧著後端業務的話,那麼應用程式也很容易被拖垮。

反觀選擇訊息佇列,例如 rabbitmq,就不存在以上的問題。首先 rabbitmq 的佇列存放到記憶體中,速度快且不占用磁碟 i/o。再乙個就是 rabbitmq 會主動將任務推送給 worker,所以 worker 無需頻繁的去輪詢佇列,避免無謂的資源浪費。

def add(...):

如果你僅希望返回並持久化任務執行失敗的異常結果,以便於後續的調查分析,那麼你可以在使用資料庫作為 result backend 的同時應用下列配置:

# only store task errors in the result backend.
所謂事有輕重緩急,任務如是。例如,使用者的驗證碼簡訊比較緊急,應及時傳送,而宣傳簡訊則可以延後再發,以此提供更好的使用者體驗。

實現任務優先順序最簡單的思路就是,首先將任務進行合理分類,一般的我們會將實時任務、高頻率任務、短時間任務劃分為高優先順序任務;而定時任務、低頻率任務、長時間任務則為低優先順序任務。然後再為處理高優先順序任務的佇列分配更多的 worker。

不過這種簡單粗暴的方式還存在乙個問題,當高優先順序任務被消費完後,相應的workers 就會空閒下來,非常浪費系統資源。那麼改善的方法就是,「在高優先順序任務佇列始終擁有更多 worker 的前提下,當這些 worker 空閒時,也可以用於處理低優先順序的任務」。利用 worker 多佇列訂閱特性即可實現這個效果,例如,現在有 high_queue、low_queue 以及 worker_1、2、3。那麼就可以讓 worker_1、2、3 均訂閱 high_queue 的同時,也讓 worker_2、3 訂閱 low_queue。

celery worker 支援下列四種併發方式。

通過配置項 worker_pool 指定,預設為 prefork:

# single-threaded execution pool
同時還可以通過配置項 worker_concurrency 來指定併發池的 size,預設為執行環境的 cpu 數量:

回到正題,當我們選擇使用 prefork/gevent 併發方式時,建議應用 worker 併發池的 autoscale 自動適配功能,在 celery cli 中使用--autoscale選項來指定併發池的上下限。例如:

celery worker -a proj --autoscale=6,3
但需要注意的是,無論是 worker 的數量還是併發池的數量都並非越多越好,畢竟其自身的存在就需要消耗系統資源。但有乙個原則是,當你的任務為 i/o 密集型時,可以適量增大併發池的 size;如果你的任務為 cpu 密集型時,預設 size 不失為乙個保險的選擇。總而言之,最佳配比需要結合自身實際情況不斷的嘗試得出。

prefetch 預取數是繼承至 rabbitmq 的原語,即為 worker 一次從佇列中獲取的任務訊息的數量。任務的執行時間有長有短,我們應該為短時間任務設定更大的任務預取數,以降低獲取任務帶來的資源消耗。

通過配置項 worker_prefetch_multiplier 來指定全域性預取數乘子,預設為 4。當設定為 1 時,表示 disable 預取功能;當設定為 0 時,表示 worker 會盡可能多的獲取任務。

# prefetch_count =  worker_prefetch_multiplier * concurrent_processes_count
如果你的任務既有長任務,又有短任務,那麼這裡建議你應用分開配置的 worker 。以檔案上傳為例,上傳小檔案(小於 1mb)的數量要遠大於上傳大檔案(大於 20mb)的數量。那麼小檔案上傳任務就屬於高頻短任務,而大檔案上傳任務則是低頻長任務。分別實現 queue_small/worker_small_1、2 以及 queue_big/worker_big 來處理,同時應該為 worker_small_1、2 設定更大的 prefetch。

# filename: big_prefetch.py

celeryd_prefetch_multiplier = 10

# filename: small_prefetch.py

celeryd_prefetch_multiplier = 100

celery worker -a proj -q queue_small --config big_prefetch -n worker_small_1

celery worker -a proj -q queue_small --config small_prefetch -n worker_small_2

celery worker -a proj -q queue_big --config big_prefetch -n worker_big

celery 雖然提供了任務異常重試,但卻無法保證任務的事務性,即不提供任務狀態的回滾能力。所以為了讓任務更易於部署和重試,應該盡量將乙個長任務拆解為多個符合冪等性的短任務。

冪等(idempotent)是乙個數學概念,常見於抽象代數。冪等性函式的特徵為「如果接受到相同的實參,那麼無論重複執行多少次,都能得到相同的結果」。例如,get_user_name()set_true()均屬冪等函式。

可見冪等性任務結合任務異常重試,能夠非常有效的提高任務執行的健壯性。

避免某些任務一直處於非正常的進行中狀態,阻塞佇列中的其他任務。應該為任務執行設定超時時間。如果任務超時未完成,則會將 worker 殺死,並啟動新的 worker 來替代。

def add(...):

celery 支援 group/chain/chord/chunks/map/starmap 等多種工作流原語,基本可以覆蓋大部分複雜的任務組合需求,善用任務工作流能夠更好的應用 celery 優秀的併發特性。例如,如果下一步任務需要等待上一步任務的執行結果,那麼不應該單純的應用 get 方法來實現同步子任務,而是應該使用 chain 任務鏈。

使用 rabbitmq 充當 broker,可以應用 rabbitmq 的 ack 機制來保證任務有效傳遞。但在任務執行要求非常嚴格的場景中,「有效傳遞」顯然是不夠的,「有效執行」才可以。

為了支援「有效執行」,celery 在 ack 的基礎上提供了 ack_late 機制。即只有當任務完成(成功/失敗)後,再向 broker 回傳 ack。而代價就是訊息佇列的效能會降低,畢竟任務訊息占用佇列資源的時間變長了。

通常的,對於一些以小時為單位的長時間任務,我會建議實現一次只保留一項任務的 ack late 方式。

def add(...):

有時候任務執行需要物件的參與,此時建議傳遞物件的唯一標識,而非直接將物件序列化後再傳遞。例如,不要嘗試將資料庫的 orm 物件作為任務訊息傳遞,而是傳遞 orm 物件的主鍵 id。當任務執行到需要使用 orm 物件時,再通過 id 從資料庫實時獲取,避免 orm 物件因為佇列阻塞導致與資料庫實時記錄不一致的情況。

同乙個 worker 在執行了大量任務後,會有機率出現記憶體洩漏的情況。這裡建議全域性設定 worker 最大的任務執行數,worker 在完成了最大的任務執行數後就主動退出。

定時任務的排程計畫要經過科學合理的設計,一般的,我們建議遵守以下幾點原則:

與系統管理員和資料庫管理員溝通,確保你預期的排程時間不會與他們的定時任務衝突。

將定時排程任務分散到各個時間點執行,均衡負載。

要考慮執行定時任務對生產業務系統的影響,盡可能在業務低峰期執行。

相比檢視日誌,flower 的 web 介面會顯得更加友好。

分布式任務佇列Celery

celery 芹菜 是基於python開發的分布式任務佇列。它支援使用任務佇列的方式在分布的機器 程序 執行緒上執行任務排程。基本用法是在程式裡引用celery,並將函式方法繫結到task from celery import celery def add x,y return x y from t...

python Celery分布式任務佇列

celery是乙個簡單,靈活且可靠的分布式系統,可以處理大量訊息,同時為操作提供維護該系統所需的工具。這是乙個任務佇列,著重於實時處理,同時還支援任務排程。celery通過訊息進行通訊,通常使用 在客戶端和工作人員之間進行調解。為了啟動任務,客戶端將訊息新增到佇列中,然後 將訊息傳遞給工作人員 ce...

Celery分布式任務佇列

celery是乙個簡單 靈活且可靠的,處理大量訊息的分布式系統 專注於實時處理的非同步任務佇列 同時也支援任務排程 celery的架構由三部分組成,訊息中介軟體 message broker 任務執行單元 worker 和任務執行結果儲存 task result store 組成。訊息中介軟體 ce...