學無止境 使用Celery執行有依賴關係的任務

2022-07-10 06:21:10 字數 3656 閱讀 6113

有 a-f 6個任務,它們的執行關係如下:

實現這樣乙個具有依賴關係的系列任務有很多種方法。這裡採用的是celery。

celery是分布式(非同步)任務佇列。

具體內容可以參考這裡 ->

示例程式的結構如下:

tasks.py:任務程式

sleep.sh:實際呼叫的任務指令碼

celeryconfig.py:配置檔案

log:存放log的目錄

下面從易到難分別講解。

sleep.sh,這是乙個dummy的任務指令碼,模擬了乙個耗時為3秒的測試任務。在任務程式tasks.py中,會呼叫到這個指令碼。

echo start

sleep 3

echo done

注:在以下示例程式中,所有任務都呼叫了同乙個指令碼,實際情況可能是乙個任務對應乙個指令碼。

job1根據傳入引數 name ,來分別呼叫指令碼執行不同的任務,然後記錄在對應的日誌檔案中。

import time

import subprocess

from celery import celery

# ini

# execute shell cmd

def execute(command):

p = subprocess.popen(command, shell=true, stdout=subprocess.pipe, stderr=subprocess.pipe)

stdout, stderr = p.communicate()

if p.returncode:

raise exception("%s exited with status %r: %r " % (command, p.returncode, stderr))

return stdout if stdout is none else stdout.strip()

def job1(name):

cmd_1 = '~/celery/sleep.sh > ~/celery/log/cmd1.log'

cmd_2 = '~/celery/sleep.sh > ~/celery/log/cmd2.log'

cmd_3 = '~/celery/sleep.sh > ~/celery/log/cmd3.log'

if name == "a":

execute(cmd_1)

return 1

elif name == "b":

execute(cmd_2)

return 2

elif name == "c":

execute(cmd_3)

return 3

return 0

celeryconfig.py,這裡主要配置了 backend 和 broker 。

result_backend = 'rpc://'

broker_url = 'redis://localhost'

參考這裡 ->

from tasks import job1

import time

time_interval = 0.5

def waitfor(input):

job = input[0]

name = input[1]

while not job.ready():

time.sleep(time_interval)

print("waiting for job " + name + " - " + job.id + " to complete ...")

print("done to run job " + name)

return

def runjob(name):

print("start to run job " + name)

job = job1.delay(name)

return [job,name]

# start job a and b

job_a = runjob("a")

job_b = runjob("b")

waitfor(job_a)

waitfor(job_b)

# start job c

job_c = runjob("c")

waitfor(job_c)

需要注意的是, concurrency 至少要》2,這樣才能兩個 job 同時執行,一般配置為和機器的cpu核數相等。

service docker start

docker run -d -p 6379:6379 redis

celery -a tasks worker --loglevel=info --concurrency=6

start to run job a

start to run job b

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

waiting for job a - e5897b71-8841-4628-9401-b3fa519109a9 to complete ...

done to run job a

done to run job b

start to run job c

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

waiting for job c - bf7d1a4f-c8d8-4f09-9600-bcfe877d8f10 to complete ...

done to run job c

學無止境 氣有浩然

學無止境 氣有浩然 山東大學 校訓 學無止境 意在 為學 它以永不滿足的執著精神激勵我們在學術和人生的里程中勇於挑戰極限,追求完美。氣有浩然 意 在 為人 它將我們應具有的道德情操 人文素養和精神品格皆寓意於這天地間的 正氣 之中。校訓 學無止境,氣有浩然 是以永不滿足的執著精神激勵廣大師生在學術和...

學無止境,堅持無價!

學習程式設計已有三年之久,正是加入軟體開發這個行業也有四個月的時間了。四個月可能對於大學生來說並不算什麼,但是對與我而言感覺時間每時每刻都在流逝,為什麼要這麼說呢!因為在外工作的這段時間我想了很多也想去做很多東西,有的時候忽然會覺得以前的時光都是自己浪費了,大學沒有想過自己花心思去學習一門技術也沒有...

學無止境 Unix與Linux

unix作業系統是由ken thompson和dennis ritchie於1969 1970年發明。它的部分技術 可以追溯到multics工程,後者因為過於龐大複雜而失敗。研究人員吸取教訓,實現了一種分時作業系統,即unix。multi是大的意思,而uni是小的意思。表示後者相當小巧。linux作...