分布式程序是將process程序分布到多台伺服器中,利用多台機器的效能完成複雜的任務。可以應用到分布式爬蟲的開發中。分布式程序在python中依然要用到multiprocess模組。它不但支援多程序,其中managers子模組還支援吧多程序分不到多台機器上,可以寫乙個服務程序作為排程者,將任務分不到其他多個程序中,依靠網路通訊進行管理。
例:分布式:
主要問題就是講queue暴露在網路中,讓其他機器的程序都可以訪問,分布式程序就是將這一過程進行了封裝,可以將之成為本佇列的網路化。
建立分布式程序需要的六個步驟:
1、建立佇列queue,用來程序之間的通訊:
服務程序建立任務佇列task_queue,用來作為傳遞惹怒給任務程序的通道。在分布式多程序環境下必須通過由queuemanager
獲得queue介面來新增任務。
2、把第一步中建立的佇列在網路上註冊,暴露給其他程序(主機),註冊後獲得網路佇列,相當於本地佇列的映像。
3、建立乙個物件(queuemanager(basemanage))例項manager,繫結埠和驗證口令。
4、啟動第三部中建立的例項,即啟動管理manager,監管通訊通道。
5、通過管理例項的方法獲得通過網路訪問queue物件,即再把網路佇列例項化成可已使用的本地佇列。
6、建立人物到"本地"佇列中,自動上傳人物到網路佇列中,分配給任務程序進行處理。
python版本:3.5
系統:mac os
taskmanager.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
__author__ = '
fade zhao'#
服務程序(taskmanager.py)
#匯入隨機,時間,佇列
from multiprocessing.managers import
basemanager
import
time,random,queue
##實現第一步:建立task_queue和result_queue,用來存放任務和結果
task_queue =queue.queue()
result_queue =queue.queue()
class
queuemanager(basemanager):
pass
#實現第二步:把建立的兩個佇列註冊在網路上,利用register方法,callable引數關聯了queue物件,
#將queue物件在網路中暴露
queuemanager.register('
get_task_queue
',callable=lambda
:task_queue)
queuemanager.register(
'get_result_queue
',callable=lambda
:result_queue)
#實現第三步:繫結埠8001,設定驗證口令b『abc』(是b格式)。這個相當於物件的初始化
manager=queuemanager(address=('
127.0.0.1
',1234),authkey=b'
abc',)#
實現第四步:啟動管理,監聽資訊通道
manager.start()
#實現第五步:通過管理例項的方法獲得通過網路訪問的queue物件
task=manager.get_task_queue()
result=manager.get_result_queue()
#實現第六步:新增任務
for url in ["
imageurl_
"+str(i) for i in range(10)]:
print('
put task %s ...
' %url)
task.put(url)
#獲取返回結果
print('
try get result...')
for i in range(10):
print('
result is %s
' %result.get(timeout=10))
#關閉管理
manager.shutdown()
taskworker.py
#!/usr/bin/env python
#-*- coding: utf-8 -*-
__author__ = '
fade zhao
'import
time
from multiprocessing.managers import
basemanager
#建立類似的queuemanager:
class
queuemanager(basemanager):
pass
#實現第一步:使用queuemanager註冊獲取queue的方法名稱
queuemanager.register('
get_task_queue')
queuemanager.register(
'get_result_queue')
#實現第二步:連線到伺服器:
server_addr = '
127.0.0.1
'print('
connect to server %s...
' %server_addr)
#埠和驗證口令注意保持與服務程序設定的完全一致:
m = queuemanager(address=(server_addr, 1234), authkey=b'
abc')#
從網路連線:
m.connect()
#實現第三步:獲取queue的物件:
task =m.get_task_queue()
result =m.get_result_queue()
#實現第四步:從task佇列取任務,並把結果寫入result佇列:
while(not
task.empty()):
image_url = task.get(true,timeout=5)
print('
run task download %s...
' %image_url)
time.sleep(1)
result.put(
'%s--->success
'%image_url)
結果:
taskmanager>>>connect to server 127.0.0.1...run task download imageurl_0...
run task download imageurl_1...
run task download imageurl_2...
run task download imageurl_3...
run task download imageurl_4...
run task download imageurl_5...
run task download imageurl_6...
run task download imageurl_7...
run task download imageurl_8...
run task download imageurl_9...
taskworker>>>connect to server 127.0.0.1...
run task download imageurl_0...
run task download imageurl_1...
run task download imageurl_2...
run task download imageurl_3...
run task download imageurl_4...
run task download imageurl_5...
run task download imageurl_6...
run task download imageurl_7...
run task download imageurl_8...
run task download imageurl_9...
注意,當我們在一台機器上寫多程序程式時,建立的queue可以直接拿來用,但是,在分布式多程序環境下,新增任務到queue不可以直接對原始的task_queue進行操作,那樣就繞過了queuemanager的封裝,必須通過manager.get_task_queue()獲得的queue介面新增。
這個簡單的manager/worker模型有什麼用?其實這就是乙個簡單但真正的分布式計算,把**稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,比如換成傳送郵件,就實現了郵件佇列的非同步傳送。
其中的queue是在taskmanage程序中被建立註冊到網路中,而taskworker通過網路獲取queue。
authkey 為了保證兩台機器正常通訊,不被其他機器惡意干擾。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定連線不上。
python 分布式程序
process可以分布到多台機器上,而thread最多只能分布到同一臺機器的多個cpu上。python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分布到多台機器上。乙個服務程序可以作為排程者,將任務分布到其他多個程序中,依靠網路通訊。由於manager...
python 學習 分布式程序
伺服器端 import random,time,queue from multiprocessing.managers import basemanager 傳送任務的佇列 task queue queue.queue 接收結果的佇列 result queue queue.queue class q...
Python之 分布式程序
要實現上面的功能,建立分布式程序需要分為六個步驟 接下來通過程式實現上面的例子 linux版本 首先編寫的是服務程序。如下 coding utf 8 author liuyazhuang date 2018 10 14 10 18 description 分布式服務程序linux版 version ...