#!/usr/bin/env python3
# coding: utf-8
from celery import celery
import settings
pw = settings.session_redis['password']
celery_broker = 'redis://:%s@localhost:6379/0' % pw
celery_backend = celery_broker
def analysis_main_12(current_id_str, q_num_str):
pass
def analysis_main_3(current_id_str, q_num_str):
pass
from celery_tasks import analysis_main_12, analysis_main_3
def main():
......
q = get_q3_from_db()
q = get_q12_from_db()
......
if __name__ == '__main__':
main()
## 消費者(worker)
- 檔案1:定義任務函式
```import config
from kombu import queue, exchange
from celery import celery
logging.basicconfig(level=logging.debug,
format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s:\t%(message)s')
celery_queues = (
queue('for_q_type3', exchange('for_q_type3'), routing_key='for_q_type3'), # consumer_arguments=),
queue('for_q_type12', exchange('for_q_type12'), routing_key='for_q_type12'), # consumer_arguments=),
queue('default', exchange('default'), routing_key='default'),
) # consumer_arguments= 數字越大,優先順序越高 - only for rabbitmq?
- 啟動命令 或 docker的entrypoint.sh
例如,這是乙個entrypoint.sh:
```bash
#!/bin/sh
echo executing entrypoint.sh ...
上述配置中需要注意生產者、消費者和啟動命令三者所用的queue是相對應的,不要寫錯。
上述配置只驗證了多個任務佇列,至於優先佇列功能是否有效未做驗證。 - priority queue 參考:
Celery多佇列配置
專案結構 proj init celeryconfig.py celery的配置檔案 tasks.py celery編寫任務檔案 coding utf 8 from future import absolute import from celery import celery proj includ...
celery清空佇列
本人在使用python celery佇列時,有時候需要清空佇列,但是python celery似乎並沒有清空佇列的api 至少我沒找到 所以使用redis cli工具進行佇列清空 使用redis cli工具,命令如下,在終端直接輸入 redis cli n 15 ltrim transcode 0 ...
Celery任務佇列
使用任務佇列作為分發任務的機制。乙個任務佇列的輸入是一組被稱為任務的工作單元。專用的工人會持續監聽任務佇列來等待完成新的工作。celery通過訊息進行通訊,通常使用中間人作為客戶端和工人 workers 間的媒介。為了初始化一項任務,客戶端會新增一條訊息到佇列中,然後中間人傳遞這條訊息給乙個work...