python分配任務 Python 通用任務分發器

2021-10-13 02:08:33 字數 4113 閱讀 3276

應用場景

把乙個大任務分解成n個子任務並行執行,全部完成後可進行匯**計。

例如:訊息分發:將一條訊息傳送給100w使用者,可拆分成100個子任務,每個子任務傳送1w使用者;

資料處理:對1000w條資料記錄進行某種操作,可拆分成n個子任務並行執行;

特點一鍵執行,無須手動分解任務

支援動態加入新的worker程序

任務分發是負載均衡的

子任務完成後可進行匯**計

程序檢視

使用方法

任務呼叫器 task_dispatcher.py

#!/usr/bin/python

#coding: utf-8

# task dispatcher.

# binds push socket to tcp://localhost:5557

# sends batch of tasks to workers via that socket

# start after all workers started

# author: wilzhang

# created: 2015-07-29

import os

# change path to current file path

os.chdir(os.path.split(os.path.realpath(__file__))[0])

import sys

import json

import time

import zmq

def get_one_task():

"""get one task each time.

return a str.

#todo

return 'task a'

def main():

context = zmq.context()

# socket to send messages on

sender = context.socket(zmq.push)

sender.bind("tcp://*:5557")

# wait all clients connected

time.sleep(3)

# send tasks

taskn = 0

while true:

task = get_one_task()

print 'get task', str(task)

if task is none:

break

sender.send_string(str(task))

taskn += 1

# give 0mq time to deliver

print 'finish. task number:', taskn

if __name__ == '__main__':

main()

工作程序 worker.py

#!/usr/bin/python

#coding: utf-8

# task worker.

# connects pull socket to tcp://localhost:5557

# collects tasks from dispatcher via that socket

# connects push socket to tcp://localhost:5558

# sends results to collector via that socket

# author: wilzhang

# created: 2015-07-29

import os

# change path to current file path

os.chdir(os.path.split(os.path.realpath(__file__))[0])

import sys

import json

import time

import zmq

import random

import traceback

def do_work(task):

"""do task and return the result.

return a str.

#todo

return 'result'

def main():

context = zmq.context()

# socket to receive messages on

receiver = context.socket(zmq.pull)

receiver.connect("tcp://localhost:5557")

# socket to send messages to

sender = context.socket(zmq.push)

sender.connect("tcp://localhost:5558")

# process tasks forever

while true:

try:

print 'waiting work'

s = receiver.recv()

print 'receive msg', s

# do the work

result = do_work(s)

# send results to collector

if result:

sender.send(str(result))

print 'finish work'

except exception, e:

print traceback.format_exc()

continue

if __name__ == '__main__':

main()

匯**計程序 collector.py

#!/usr/bin/python

#coding: utf-8

# result collcetor.

# binds pull socket to tcp://localhost:5558

# collects results from workers via that socket

# author: wilzhang

# created: 2015-07-29

import os

# change path to current file path

os.chdir(os.path.split(os.path.realpath(__file__))[0])

import json

import time

import zmq

import random

import sys

def collect_result(result):

"""collect result and do some calculation.

#todo

print 'finsh n tasks, cost time: 3600s'

def main():

context = zmq.context()

# socket to receive messages on

receiver = context.socket(zmq.pull)

receiver.bind("tcp://*:5558")

# process

while true:

s = receiver.recv()

print 'receive msg', s

collect_result(s)

print 'finish'

if __name__ == '__main__':

main()

修改步驟:

實現task_dispatcher.py中的get_one_task函式,每次分配乙個任務

實現worker.py中的do_work函式,處理任務

實現collector.py中的collect_result函式,統計結果

啟動順序:

啟動collector.py

啟動n個worker.py

啟動task_dispatcher.py

如何在NEO共識節點間分配任務

任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...

如何在NEO共識節點間分配任務

任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...

如何在NEO共識節點間分配任務

任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...