用 python 實現的執行緒池
用 python 實現的執行緒池
軟體技術
lhwork 發表於 2007-2-1 8:53:26
為了提高程式的效率,經常要用到多執行緒,尤其是io等需要等待外部響應的部分。執行緒的建立、銷毀和排程本身是有代價的,如果乙個執行緒的任務相對簡單,那這些時間和空間開銷就不容忽視了,此時用執行緒池就是更好的選擇,即建立一些執行緒然後反覆利用它們,而不是在完成單個任務後就結束。
下面是用python實現的通用的執行緒池**:
undefined
view plain
copy to clipboard
print?
import
queue, threading, sys
from
threading
import thread
import
time,urllib
# working thread
class worker(thread):
worker_count = 0
def__init__( self, workqueue, resultqueue, timeout = 0, **kwds):
thread.__init__( self, **kwds )
self.id = worker.worker_count
worker.worker_count += 1
self.setdaemon( true )
self.workqueue = workqueue
self.resultqueue = resultqueue
self.timeout = timeout
def run( self ):
''' the get-some-work, do-some-work main loop of worker threads '''
while
true:
try:
callable, args, kwds = self.workqueue.get(timeout=self.timeout)
res = callable(*args, **kwds)
print "worker[%2d]: %s" % (self.id, str(res) )
self.resultqueue.put( res )
except
queue.empty:
break
except :
print 'worker[%2d]' % self.id, sys.exc_info()[:2]
class workermanager:
def__init__( self, num_of_workers=10, timeout = 1):
self.workqueue = queue.queue()
self.resultqueue = queue.queue()
self.workers =
self.timeout = timeout
self._recruitthreads( num_of_workers )
def _recruitthreads( self, num_of_workers ):
for i in
range( num_of_workers ):
worker = worker( self.workqueue, self.resultqueue, self.timeout )
def start(self):
for w in
self.workers:
w.start()
def wait_for_complete( self):
# ...then, wait for each of them to terminate:
while
len(self.workers):
worker = self.workers.pop()
worker.join( )
if worker.isalive() and
notself.workqueue.empty():
print "all jobs are are completed."
def add_job( self, callable, *args, **kwds ):
self.workqueue.put( (callable, args, kwds) )
def get_result( self, *args, **kwds ):
return
self.resultqueue.get( *args, **kwds )
worker類是乙個工作執行緒,不斷地從workqueue佇列中獲取需要執行的任務,執行之,並將結果寫入到resultqueue中,這裡的workqueue和resultqueue都是現成安全的,其內部對各個執行緒的操作做了互斥。當從workqueue中獲取任務超時,則執行緒結束。
workermanager負責初始化worker執行緒,提供將任務加入佇列和獲取結果的介面,並能等待所有任務完成。
undefined
view plain
copy to clipboard
print?
def test_job(id, sleep = 0.001 ):
try:
urllib.urlopen('').read()
except:
print '[%4d]' % id, sys.exc_info()[:2]
return
iddef test():
import
socket
socket.setdefaulttimeout(10)
print 'start testing'
wm = workermanager(10)
for i in
range(500):
wm.add_job( test_job, i, i*0.001 )
wm.start()
wm.wait_for_complete()
print 'end testing'
用python實現的執行緒池
python3標準庫里自帶執行緒池threadpoolexecutor 和程序池processpoolexecutor 當然也可以自己寫乙個threadpool。coding utf 8 import queue import threading import sys import time imp...
用python建立執行緒池
物件導向開發中,大家知道建立和銷毀物件是很費時間的,因為建立乙個物件要獲取記憶體資源或者其它更多資源。無節制的建立和銷毀執行緒是一種極大的浪費。那我們可不可以把執行完任務的執行緒不銷毀而重複利用呢?彷彿就是把這些執行緒放進乙個池子,一方面我們可以控制同時工作的執行緒數量,一方面也避免了建立和銷毀產生...
Python的執行緒池實現
實現 coding utf 8 import queue import threading import sys import time import urllib 替我們工作的執行緒池中的執行緒 class mythread threading.thread def init self,workq...