版本一:#!/usr/bin/env python
#-*- coding:utf-8 -*-
import
queue
import
threading
class
threadpool(object):
def__init__(self, max_num=20):
self.queue =queue.queue(max_num)
for i in
xrange(max_num):
self.queue.put(threading.thread)
defget_thread(self):
return
self.queue.get()
defadd_thread(self):
self.queue.put(threading.thread)
"""pool = threadpool(10)
def func(arg, p):
print arg
import time
time.sleep(2)
p.add_thread()
for i in xrange(30):
thread = pool.get_thread()
t = thread(target=func, args=(i, pool))
t.start()
"""
版本二:#!/usr/bin/env python
#-*- coding:utf-8 -*-
"""custom threadpool
how to use:
pool = threadpool(1)
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
def action(i):
pass
for i in range(20):
if pool.stop:
pool.terminal()
break
ret = pool.run(action, (i,), callback)
print 'end'
"""import
queue
import
threading
import
contextlib
stopevent =object()
class
threadpool(object):
def__init__
(self, max_num):
self.q =queue.queue(max_num)
self.max_num =max_num
self.cancel =false
self.generate_list =
self.free_list =
def run(self, func, args, callback=none):
"""執行緒池執行乙個任務
:param func: 任務函式
:param args: 任務函式所需引數
:param callback: 任務執行失敗或成功後執行的**函式,**函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為none,即:不執行**函式)
:return: 如果執行緒池已經終止,則返回true否則none
"""if
self.cancel:
return
true
if len(self.free_list) == 0 and len(self.generate_list) self.generate_thread()
w =(func, args, callback,)
self.q.put(w)
defgenerate_thread(self):
"""建立乙個執行緒
"""t = threading.thread(target=self.call)
t.start()
defcall(self):
"""迴圈去獲取任務函式並執行任務函式
"""current_thread =threading.currentthread
event =self.q.get()
while event !=stopevent:
func, arguments, callback =event
try:
result = func(*arguments)
success =true
except
exception, e:
success =false
result =none
if callback is
notnone:
try:
callback(success, result)
except
exception, e:
pass
with self.worker_state(self.free_list, current_thread):
event =self.q.get()
else
: self.generate_list.remove(current_thread)
defterminal(self):
"""終止執行緒池中的所有執行緒
"""self.cancel =true
full_size =len(self.generate_list)
while
full_size:
self.q.put(stopevent)
full_size -= [email protected]
defworker_state(self, state_list, worker_thread):
"""用於記錄執行緒中正在等待的執行緒數
"""
try:
yield
finally
: state_list.remove(worker_thread)
上下文管理:
自定義執行緒池
有些時候 jdk自帶的cachedthreadpool fixedthreadpool等執行緒池完成不了我們業務的需求時 可以用threadpoolexecutorg構造自定義的執行緒池。public class usethreadpoolexecutor1 這段 會首先執行任務1,然後把2 3 4...
自定義執行緒池
建立執行緒池方法 儘管executors提供了四種執行緒池建立的方式,但為了實現某些特定的需求,可以自己建立執行緒池。如在阿里的程式設計規範使用executors建立執行緒時,一般會報錯,並提示以下資訊 執行緒池不允許使用executors去建立,而是通過threadpoolexecutor的方式,...
自定義執行緒池
自定義執行緒池建立api 執行緒池建立通過juc的介面 executor 實現,平時我們使用其實現類 threadpoolexecutor 實現自定義執行緒池。常用建構函式 public threadpoolexecutor int corepoolsize,int maximumpoolsize,...