要實現上面的功能,建立分布式程序需要分為六個步驟:
接下來通過程式實現上面的例子(linux版本),首先編寫的是服務程序。
**如下:
# -*- coding:utf-8 -*-
''' @author liuyazhuang
@date 2018/10/14 10:18
@description 分布式服務程序linux版
@version 1.0.0
'''import random, time, queue
from multiprocessing.managers import basemanager
#第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue = queue.queue()
result_queue = queue.queue()
class queuemanager(basemanager):
pass
#第二步:把建立的兩個佇列註冊在網路上,利用register方法,callable引數關聯了queue物件,將queue物件在網路中暴露
queuemanager.register('get_task_queue', callable=lambda: task_queue)
queuemanager.register('get_result_queue', callable=lambda: result_queue)
#第三步:繫結埠8001,設定驗證口令'lyz',這個相當於物件的初始化
manager = queuemanager(address=('', 8001), authkey='lyz')
#第四步:啟動管理,監聽資訊通道
manager.start()
#第五步:通過管理例項的方法獲得通過網路訪問的queue物件
task = manager.get_task_queue()
result = manager.get_result_queue()
#第六步:新增任務
for url in ["imageurl_" + i for i in range(10)]:
print 'put task %s ...' % url
task.put(url)
#獲取返回結果
print 'try get result...'
for i in range(10):
print 'result is %s' % result.get(timeout=10)
#關閉管理
manager.shutdown()
服務程序已經編寫完成,接下來編寫任務程序,建立任務程序分為四個步驟:
**(win/linux版本)如下:
# -*- coding:utf-8 -*-
''' @author liuyazhuang
@date 2018/10/14 10:44
@description 分布式任務程序win/linux版本
@version 1.0.0
'''import time
from multiprocessing.managers import basemanager
#建立queuemanager
class queuemanager(basemanager):
pass
#第一步:使用queuemanager註冊用於獲取queue的方法名稱
queuemanager.register('get_task_queue')
queuemanager.register('get_result_queue')
#第二步:連線到伺服器
server_addr = '127.0.0.1'
print 'connect to server %s...' % server_addr
#埠和驗證口令注意保持與服務程序完全一致
m = queuemanager(address=(server_addr, 8001), authkey='lyz')
#從網路連線
m.connect()
#第三步:獲取queue的物件
task = m.get_task_queue()
result = m.get_result_queue()
#第四步:從task佇列獲取任務,並把結果寫入result佇列
while(not task.empty()):
image_url = task.get(true, timeout = 5)
print 'run task download %s...' % image_url
time.sleep(1)
result.put('%s------->success' % image_url)
#處理結束
print 'worker exit.'
由於平台的特性,建立服務程序的**在linux和windows上有一些不同,建立任務程序的**是一致的。
windows上服務程序**如下:
# -*- coding:utf-8 -*-
''' @author liuyazhuang
@date 2018/10/14 10:55
@description 分布式服務程序windows版
@version 1.0.0
'''import queue
from multiprocessing.managers import basemanager
from multiprocessing import freeze_support
#任務個數
task_num = 10
#定義收發佇列
task_queue = queue.queue(task_num)
result_queue = queue.queue(task_num)
def get_task():
return task_queue
def get_result():
return result_queue
#建立queuemanager
class queuemanager(basemanager):
pass
def win_run():
#windows下繫結呼叫介面不能用lambda,所以只能先定義函式再繫結
queuemanager.register('get_task_queue', callable=get_task)
queuemanager.register('get_result_queue', callable=get_result)
#繫結埠並設定驗證口令,windows下需要填寫ip位址,linux下不填預設為本地
manager = queuemanager(address=('127.0.0.1', 8001), authkey='lyz')
#啟動manager.start()
try:
#通過網路獲取任務佇列和結果佇列
task = manager.get_task_queue()
result = manager.get_result_queue()
#新增任務
for url in ["imageurl_" + str(i) for i in range(10)]:
print 'put task %s ...' % url
task.put(url)
print 'try get result ...'
for i in range(10):
print 'result is %s' % result.get(timeout = 10)
except:
print 'manager error'
finally:
#一定要關閉,否則會報管道未關閉的錯誤
manager.shutdown
if __name__ == '__main__':
#windows下多程序可能會有問題,新增這句可以緩解
freeze_support()
win_run()
python之分布式多程序通訊
首先實現跨平台的多程序通訊,熟悉生產者 緩衝區 消費者模型,我們都知道產生資料模組為生產者,而處理資料模組稱為消費者,生產者與消費者之間的中介稱之為緩衝區。在多程序開發中,生產者就是生產資料的程序,消費者就是消費資料的程序,如果生產者與消費者程序速度不一致,就會造成等待現象,而為了解決這種生產 消費...
分布式之分布式事務
被人問到分布式事務,之前學rabbitmq 的時候學到過rabbitmq 高階的事務,因為沒有用過,所有沒有回答好。這裡總結一下。1.單機版事務。事務的四大特性 acid a.原子性 b.一致性 c.隔離性 d.永續性 單機事務可以通過設定事務的隔離級別 參見spring 的事務隔離級別 2.分布式...
Python學習筆記之分布式程序任務管理器
我們使用多程序去完成多個任務時會使得我們的工作效率大大提公升,這都是在同一臺電腦上執行的,畢竟一台計算機的資源是有限的,但是當我們將多台計算機使用網路協議同時去處理多個相關聯任務時,我們可用的資源幾乎是無限的。因此我們出現了分布式程序管理。python中我們前面學習了使用multiprocessin...