環境:win7 x64,python 2.7,apscheduler 2.1.2。
原理圖如下:
**部分:
(1)、中心節點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心節點(主要功能是分配任務)
import socketserver, socket, queue
centerip = '127.0.0.1' #中心節點ip
centerlistenport = 9999 #中心節點監聽埠
centerclient = socket.socket(socket.af_inet, socket.sock_dgram) #中心節點用於傳送網路訊息的socket
taskqueue = queue.queue() #任務佇列
#獲取任務佇列
def gettaskqueue():
for i in range(1, 11):
taskqueue.put(str(i))
#centerserver的**函式,在接受到udp報文是觸發
class myudphandler(socketserver.baserequesthandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1www.cppcns.com]
print(data)
if data.startswith('wait'):
vec = data.split(':')
if len(vec) != 3:
print('error: len(vec) != 3')
else:
nodeip = vec[1]
nodelistenport = vec[2]
nodeid = nodeip + ':' + nodelistenport
if not taskqueue.empty():
task = taskqueue.get()
程式設計客棧 print('send task ' + task + ' to ' + nodeid)
centerclient.sendto('task:' + task, (nodeip, int(nodelistenport)))
else:
print('taskqueue is e')
gettaskqueue() #獲取任務佇列
centerserver = socketserver.udpserver((centerip, centerlistenport), myudphandler)
print('listen port ' + str(centerlistenport) + ' ...')
centerserver.serve_forever()
(2)、任務節點:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任務節點(請求/接收/執行任務)
import time, socket, socketserver
from apscheduler.scheduler import scheduler
centerip = '127.0.0.1' #中心節點ip
cen程式設計客棧terlistenport = 9999 #中心節點監聽埠
nodeip = socket.gethostbyname(socket.gethostname()) #任務節點自身ip
nodeclient = socket.socket(socket.af_inet, sock程式設計客棧et.sock_dgram) #任務節點用於傳送網路訊息的socket
#任務:傳送網路資訊
def jobsendnetmsg():
msg = ''
if nodeserver.taskstate == 'wait':
msg = 'wait:' + nodeip + ':' + str(nodelistenport)
elif nodeserver.taskstate == 'exec':
msg = 'exec:' + nodeip + ':' + str(nodelistenport)
print(msg)
nodeclient.sendto(msg, (centerip, centerlistenport))
#新增並啟動定時任務
def inittimer():
sched = scheduler()
sched.add_interval_job(jobsendnetmsg, seconds=1)
sched.start()
#執行任務
def exectask(task):
print('exectask ' + task + ' ...')
time.sleep(2)
print('exectask ' + task + ' over')
#nodeserver的**函式,在接受到udp報文是觸發
class myudphandler(socketserver.baserequesthandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print('recv data: ' + data)
if data.startswith('task'):
vec = data.split(':')
if len(vec) != 2:
print('error: len(vec) != 2')
else:
task = vec[1]
self.server.taskstate = 'exec'
exectask(task)
self.server.taskstate = 'wait'
inittimer()
nodeserver = socketserver.udpserver(('', 0), myudphandler)
nodeserver.taskstate = 'wait' #(exec/wait)
nodelistenport = nodeserver.server_address[1]
print('nodelistenport:' + str(nodelistenport))
nodeserver.serve_forever()
python分布式架構 分布式架構
1.分布式架構 採用centos mongodb windows2012 python redis進行分布式架構搭建,mongodb的框架最核心的設計就是 mongodb和mapreduce。mongodb為海量的資料提供了儲存,則mapreduce為海量的資料提供了計算,windows2012作為...
python 分布式程序
process可以分布到多台機器上,而thread最多只能分布到同一臺機器的多個cpu上。python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分布到多台機器上。乙個服務程序可以作為排程者,將任務分布到其他多個程序中,依靠網路通訊。由於manager...
Python 分布式程序
分布式程序是將process程序分布到多台伺服器中,利用多台機器的效能完成複雜的任務。可以應用到分布式爬蟲的開發中。分布式程序在python中依然要用到multiprocess模組。它不但支援多程序,其中managers子模組還支援吧多程序分不到多台機器上,可以寫乙個服務程序作為排程者,將任務分不到...