簡單的執行緒池的實現:
import queue
import threading
import time
class threadpool(object):
def __init__(self, max_num=20):
self.queue = queue.queue(max_num)
for i in range(max_num):
self.queue.put(threading.thread)
def get_thread(self):
return self.queue.get()
def add_thread(self):
self.queue.put(threading.thread)
def func(arg, p):
print(arg)
time.sleep(2)
p.add_thread()
if __name__ == '__main__':
pool = threadpool(10)
for i in range(30):
thread = pool.get_thread()
t = thread(target=func, args=(i, pool))
t.start()
複雜版本的實現:
#!/usr/bin/env python3
import queue
import threading
import contextlib
import time
stopevent = object()
class threadpool(object):
def __init__(self, max_num, max_task_num = none):
if max_task_num:
self.q = queue.queue(max_task_num)
else:
self.q = queue.queue()
self.max_num = max_num
self.cancel = false
self.terminal = false
self.generate_list =
self.free_list =
def run(self, func, args, callback=none):
"""執行緒池執行乙個任務
:param func: 任務函式
:param args: 任務函式所需引數
:param callback: 任務執行失敗或成功後執行的**函式,**函式有兩個引數1、任務函式執行狀態;2、任務函式返回值(預設為none,即:不執行**函式)
:return: 如果執行緒池已經終止,則返回true否則none
"""if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""建立乙個執行緒
"""t = threading.thread(target=self.call)
t.start()
def call(self):
"""迴圈去獲取任務函式並執行任務函式
"""current_thread = threading.currentthread
event = self.q.get()
while event != stopevent:
func, arguments, callback = event
try:
result = func(*arguments)
success = true
except exception as e:
success = false
result = none
if callback is not none:
try:
callback(success, result)
except exception as e:
pass
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = stopevent
else:
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""執行完所有的任務後,所有執行緒停止
"""self.cancel = true
full_size = len(self.generate_list)
while full_size:
self.q.put(stopevent)
full_size -= 1
def terminate(self):
"""無論是否還有任務,終止執行緒
"""self.terminal = true
while self.generate_list:
self.q.put(stopevent)
self.q.empty()
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""用於記錄執行緒中正在等待的執行緒數
"""try:
yield
finally:
state_list.remove(worker_thread)
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
def action(i):
print(i)
if __name__ == '__main__':
pool = threadpool(5)
for i in range(30):
pool.run(action, (i,), callback)
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()
python第十一天
函式總結 def func a,b print a,b return a b 四個組成部分 函式名 呼叫函式的依據 函式體 執行函式邏輯的 引數列表 為函式提供內部資源 返回值 將函式執行結果返回給外界 返回值 1 空返回 沒有return或空return 2 一鍵返回 3 多值返回 裝有多個值的元...
Python 集合 第十一天
集合 set 是乙個無序的不重複元素序列。可以使用大括號 或者 set 函式建立集合,注意 建立乙個空集合必須用set 而不是 因為 是用來建立乙個空字典。建立方式 parame 或者set value 給個例項 語法格式如下 s.add x 將元素 x 新增到集合 s 中,如果元素已存在,則不進行...
UnixC第十一天
回憶昨天內容 一 訊號阻塞 sigprocmask 2 sigset t 訊號阻塞和訊號忽略的區別 可靠訊號 不可靠 訊號丟失 二 獲取程序的未決訊號集 從未決訊號集中找未決訊號 sigpending 2 什麼是未決訊號?三 訊號從產生到處理的整個過程 四 system v ipc 訊息佇列 獲取乙...