Python 學習筆記 執行緒池

2021-09-04 06:34:56 字數 4295 閱讀 8816

前面我們學校裡如何建立多執行緒,當我們接到乙個新的請求時,會建立乙個執行緒,執行完畢之後又銷毀掉這個執行緒。對於一些數目巨大,但是單個快速執行的任務,每個任務真正執行消耗的時間和執行緒建立銷毀的時間可能都差不多。這樣一來,執行緒的效率浪費的比較嚴重。因此可以考慮使用執行緒池的技術,預先建立一些空閒的執行緒,當他們接收具體任務的時候,就去直接執行了,執行完了也不銷毀,接著執行下乙個任務。

python裡面,因為暫時還沒有功能完備的執行緒池,因此這部分功能需要自己實現。

實現的基本原理是建立乙個佇列,把填充的任務資料乙個個地塞進去;然後建立乙個執行緒池,執行緒池裡面的執行緒不斷地讀取佇列裡面的任務資料,執行任務,直到所有的任務都完成。

下面是我寫的模擬fabric批量遠端操作的部分**,paramiko模組可以讓我遠端的進行ssh連線;對於每個連線,我嘗試使用了執行緒池。

#!/usr/bin/env python

# -*- coding:utf-8 -*-

# author yuan li

""" 遠端命令的執行,使用了執行緒池的技術,因為執行的時間比較少,而執行緒本身執行的時間佔的比重比較大;

"""import threading

import queue

import time

import paramiko

import os

#找到相對路徑

parent_path = os.path.abspath(os.pardir)

db_path=os.path.join(parent_path,'db')

#乙個管理類,基本思路是把任務和相關的引數填充到佇列(任務池)中,然後建立乙個程序池,裡面的程序迴圈地讀取任務池裡面的內容,任何執行其中的內容,直到所有任務全部實現。

class workmanager(object):

#建構函式

def __init__(self,cmd,username,password,work_num=1000,thread_num=2,):

""":param cmd:遠端命令

:param username: 使用者名稱

:param password: 密碼

:param work_num: 任務池(佇列大小)

:param thread_num: 執行緒池大小

"""self.cmd=cmd

self.work_num=work_num

self.thread_num=thread_num

self.queue=queue.queue()

self.threads=

self.init_task(work_num,cmd,username,password)

self.init_threadpool(thread_num)

#初始化任務池

def init_task(self,num,inp,username,password):

for i in range(num):

self.add_job(do_job,i,inp,username,password)

#新增任務到任務池

def add_job(self,job,*args):

#填充任務到任務池,每乙個任務是乙個元祖(任務,引數列表)

self.queue.put((job,list(args)))

#初始化執行緒池

def init_threadpool(self,num):

for i in range(num):

#等待掛起主線程

def wait_allcomplete(self):

for item in self.threads:

if item.isalive():

item.join()

#執行緒類,每個執行緒迴圈地去任務池取任務

class work(threading.thread):

def __init__(self,que):

super(work, self).__init__()

self.queue=que

self.start()

def run(self):

while true:

try:

#當任務池為空的時候,強制報錯,退出

do,args=self.queue.get(block=false)

# print(do,args)

do(args[0],args[1],args[2],args[3])

#確保佇列裡面的任務都完成了

self.queue.task_done()

except:

break

#初始化的乙個主機組,測試用的

hosts=['anoble-ise','bberry-ise','blackbr-ise','jlau-ise','kwood-ise','marwa-ise','smaroo-ise','psekarwin-ise','spare2-ise']

#遠端連線ssh並且執行命令

def do_job(args,inp,username,password):

""":param args: hosts列表的索引

:param inp: 遠端命令

:param username: 使用者名稱

:param password: 密碼

:return:

"""# time.sleep(0.1)

ssh = paramiko.sshclient()

ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())

ssh.connect(hosts[args], 22, username, password)

# 執行命令測試

stdin, stdout, stderr = ssh.exec_command(inp)

for line in stdout.readlines():

print(line.strip())

print(("\x1b[5;19;32m  %s \x1b[0m" % hosts[args]).center(40,'*'))

print("\n")

#選擇主機組

username=""

password=""

#入口檔案

def display():

global hosts,username,password

msg="""

歡迎使用fabric模擬程式,您可以執行以下操作

1.顯示主機組

2.批量執行遠端命令

3.批量上傳

5.輸入管理員賬號

6.退出

"""while true:

print(msg)

inpt=input("請輸入選項")

#輸出主機組的相關資訊

if inpt=='1':pass

#遠端批量操作

elif inpt=='2':

# username=input("使用者名稱")

# password=input("密碼")

if not username:

print("請先配置登入賬號資訊")

else:

while true:

inp = input("輸入指令(q返回上級目錄)\n>>>")

if inp =='q':break

if not inp:

print("不能輸入空命令")

else:

start = time.time()

#指定命令,使用者名稱,密碼,任務池(佇列)的大小,和執行緒的個數)

work_manager = workmanager(inp,username,password, len(hosts), 2)

work_manager.wait_allcomplete()

end = time.time()

print("cost time is %s" % (end - start))

#建立批量上傳的多執行緒

elif inpt=='3':

pass

elif inpt=='4':

pass

elif inpt=='5':

username = input("使用者名稱")

password = input("密碼")

elif inpt=='6':

exit("退出程式")

else:

print("無效輸入,請重試")

if __name__ == '__main__':

display()

執行緒池學習筆記

記錄一下學習執行緒池的過程,用到的函式歸結 pthread mutex lock pthread mutex unlock pthread cond wait pthread cond signal pthread cond broadcast pthread create pthread join...

學習Python執行緒池

from concurrent.futures import threadpoolexecutor,as completed import time deftest1 num lit1 for i in range num time.sleep 3 return lit1 deftest2 num ...

Python筆記 五 Python執行緒池

目前專案中有個工作是使用python定時處理資料庫中的任務,之前是每個任務都起乙個執行緒進行處理,隨著任務數的增多,起的執行緒也越來越多,最終出現記憶體溢位情況。python的執行緒池實現 import queue import threading import sys import time im...