應用場景
把乙個大任務分解成n個子任務並行執行,全部完成後可進行匯**計。
例如:訊息分發:將一條訊息傳送給100w使用者,可拆分成100個子任務,每個子任務傳送1w使用者;
資料處理:對1000w條資料記錄進行某種操作,可拆分成n個子任務並行執行;
特點一鍵執行,無須手動分解任務
支援動態加入新的worker程序
任務分發是負載均衡的
子任務完成後可進行匯**計
程序檢視
使用方法
任務呼叫器 task_dispatcher.py
#!/usr/bin/python
#coding: utf-8
# task dispatcher.
# binds push socket to tcp://localhost:5557
# sends batch of tasks to workers via that socket
# start after all workers started
# author: wilzhang
# created: 2015-07-29
import os
# change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])
import sys
import json
import time
import zmq
def get_one_task():
"""get one task each time.
return a str.
#todo
return 'task a'
def main():
context = zmq.context()
# socket to send messages on
sender = context.socket(zmq.push)
sender.bind("tcp://*:5557")
# wait all clients connected
time.sleep(3)
# send tasks
taskn = 0
while true:
task = get_one_task()
print 'get task', str(task)
if task is none:
break
sender.send_string(str(task))
taskn += 1
# give 0mq time to deliver
print 'finish. task number:', taskn
if __name__ == '__main__':
main()
工作程序 worker.py
#!/usr/bin/python
#coding: utf-8
# task worker.
# connects pull socket to tcp://localhost:5557
# collects tasks from dispatcher via that socket
# connects push socket to tcp://localhost:5558
# sends results to collector via that socket
# author: wilzhang
# created: 2015-07-29
import os
# change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])
import sys
import json
import time
import zmq
import random
import traceback
def do_work(task):
"""do task and return the result.
return a str.
#todo
return 'result'
def main():
context = zmq.context()
# socket to receive messages on
receiver = context.socket(zmq.pull)
receiver.connect("tcp://localhost:5557")
# socket to send messages to
sender = context.socket(zmq.push)
sender.connect("tcp://localhost:5558")
# process tasks forever
while true:
try:
print 'waiting work'
s = receiver.recv()
print 'receive msg', s
# do the work
result = do_work(s)
# send results to collector
if result:
sender.send(str(result))
print 'finish work'
except exception, e:
print traceback.format_exc()
continue
if __name__ == '__main__':
main()
匯**計程序 collector.py
#!/usr/bin/python
#coding: utf-8
# result collcetor.
# binds pull socket to tcp://localhost:5558
# collects results from workers via that socket
# author: wilzhang
# created: 2015-07-29
import os
# change path to current file path
os.chdir(os.path.split(os.path.realpath(__file__))[0])
import json
import time
import zmq
import random
import sys
def collect_result(result):
"""collect result and do some calculation.
#todo
print 'finsh n tasks, cost time: 3600s'
def main():
context = zmq.context()
# socket to receive messages on
receiver = context.socket(zmq.pull)
receiver.bind("tcp://*:5558")
# process
while true:
s = receiver.recv()
print 'receive msg', s
collect_result(s)
print 'finish'
if __name__ == '__main__':
main()
修改步驟:
實現task_dispatcher.py中的get_one_task函式,每次分配乙個任務
實現worker.py中的do_work函式,處理任務
實現collector.py中的collect_result函式,統計結果
啟動順序:
啟動collector.py
啟動n個worker.py
啟動task_dispatcher.py
如何在NEO共識節點間分配任務
任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...
如何在NEO共識節點間分配任務
任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...
如何在NEO共識節點間分配任務
任何計算機系統都有監控操作,可能會傳送心跳資訊 校驗和查詢及雜湊請求等。這些操作在本文中都被統稱為任務。在中心化系統中,通常會有乙個受認證的節點或節點群組來完成任務。而去中心化系統可以將任務下發給各個節點,從而靈活拓展,因此效率也顯然更高,但這也就導致了相應的問題 到底如何在所選節點間分配任務。我們...