之前也寫過多執行緒的部落格,用的是 threading ,今天來講下 python 的另外乙個自帶庫 concurrent 。concurrent 是在 python3.2 中引入的,只用幾行**就可以編寫出執行緒池/程序池,並且計算型任務效率和 mutiprocessing.pool 提供的 poll 和 threadpoll 相比不分伯仲,而且在 io 型任務由於引入了 future 的概念效率要高數倍。而 threading 的話還要自己維護相關的佇列防止死鎖,**的可讀性也會下降,相反 concurrent 提供的執行緒池卻非常的便捷,不用自己操心死鎖以及編寫執行緒池**,由於非同步的概念 io 型任務也更有優勢。
concurrent 的確很好用,主要提供了 threadpoolexecutor 和 processpoolexecutor 。乙個多執行緒,乙個多程序。但 concurrent 本質上都是對 threading 和 mutiprocessing 的封裝。看它的原始碼可以知道,所以最底層並沒有非同步。
threadpoolexecutor xkywx自己提供了任務佇列,不需要自己寫了。而所謂的執行緒池,它只是簡單的比較當前的 threads 數量和定義的 max_workers 的大小,小於 max_workers 就允許任務建立執行緒執行任務。
通過 threadpoolexecutor 類建立執行緒池物件,max_workers 設定最大執行執行緒數數。使用 threadpoolexecutor 的好處是不用擔心執行緒死鎖問題,讓多執行緒程式設計更簡潔。
from concurrent import futures
pool = futures.threadpoolexecutor(max_workers = 2)
submit(self, fn, *args, **kwargs):
該方法的作用就是提交乙個可執行的**task,它返回乙個future物件。可以看出此方法不會阻塞主線程的執行。
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['','','']
pool = futures.threadpoolexecutor(max_workers = 2)
for url in urls:
task = pool.submit(get_request,url)
print('{}主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結果
2021-03-12 15:29:10.780141:主線程
2021-03-12 15:29:10.865425: 200
2021-03-12 15:29:10.923062: 200
2021-03-12 15:29:10.940930: 200
map(self, fn, *iterables, timeout=none, chunksize=1):
map 第二個引數是可迭代物件,比如 list、tuple 等,寫法相對簡單。map 方法也不會阻塞主線程的執行。
import requests,datetime,time
from c import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['','','']
pool = futures.threadpoolexecutor(max_workers = 2)
tasks = pool.map(get_request,urls)
print('{}:主線程程式設計客棧'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結果
2021-03-12 16:14:04.854452:主線程
2021-03-12 16:14:04.938870: 200
2021-03-12 16:14:05.033849: 200
2021-03-12 16:14:05.048952: 200
如果要等待子執行緒執行完之後再執行主線程要怎麼辦呢,可以通過 wait 。
wait(fs, timeout=none, return_when=all_completed):
import requests,datetime,time
from concurrent import futures
def get_request(url):
r = requests.get(url)
print('{}:{} {}'.format(datetime.datetime.now(),url,r.status_code))
urls = ['','','']
pool = futures.threadpoolexecutor(max_workers = 2)
tasks =
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
futures.wait(tasks)
print('{}:主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結果
2021-03-12 16:30:13.437042: 200
2021-03-12 16:30:13.552700: 200
2021-03-12 16:30:14.117325: 200
2021-03-12 16:30:14.118284:主線程
as_completed(fs, timeout=none)
使用 concurrent.futures 操作 多執行緒/多程序 過程中,很多函式報錯並不會直接終止程式,而是什麼都沒發生。使用 as_completed 可以捕獲異常,**如下
# 建立執行緒池
pool = futures.threadpoolexecutor(max_workers = 2)
tasks =
for url in urls:
task = pool.submit(get_request,url)
tasks.append(task)
# 異常捕獲
errors = futures.as_completed(tasks)
for error in errors:
# error.result() 等待子執行緒都完成,並丟擲異常,中斷主線程
# 捕獲子執行緒異常,不會終止主線程繼續執行
print(error.exception())
futures.wait(tasks)
print('{}:主線程'.format(datetime.datetime.now()))
time.sleep(2)
# 輸出結果
2021-03-12 17:24:26.994937:主線程
多程序程式設計也類似,將 threadpoolexecutor 替換成 processpoolexecutor 。
基於python的爬蟲
本次初學,參考的資料見 功能主要是抓取韓寒的部落格內容,以及儲存 到 hanhan的資料夾中,執行環境實在linux下的。見 具體 如何 usr bin env python coding utf 8 import urllib import time url 60 con urllib.urlop...
基於Python操作ElasticSearch
python 2.7 es依賴包 pyelasticsearch elasticsearch 5.5.1 6.0.1 作業系統 windows 10 centos 7 本文主要就es基本的crud操作做以歸納整理,es官方對python的依賴支援有很多,eg pyelasticsearch escl...
基於Python操作ElasticSearch
python 2.7 es依賴包 pyelasticsearch elasticsearch 5.5.1 6.0.1 作業系統 windows 10 centos 7 本文主要就es基本的crud操作做以歸納整理,es官方對python的依賴支援有很多,eg pyelasticsearch escl...