有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。
現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。(redis或者mysql)
為空的記錄,需要寫入到日誌檔案中!
1. 使用多執行緒技術,每乙個執行緒讀取乙個txt檔案
2. 使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 2000條記錄
注意:開啟檔案操作,最好在乙個程序中,重複開啟檔案,會造成系統資源浪費!
#view code!/usr/bin/env python3
#coding: utf-8
"""多執行緒和協程配合使用示例
"""import
osimport
time
from gevent import
monkey;monkey.patch_all()
from gevent.pool import
pool
from functools import
partial
from multiprocessing import
process
coroutine_number = 2000 #
協程池數量
pool = pool(coroutine_number) #
使用協程池
#模擬資料倉儲,測試資料
data_dict =
class testprogram(object): #
測試程式
def__init__
(self):
self.base_dir = os.path.dirname(os.path.abspath(__file__)) #
專案根目錄
def write_log(self,number, content, colour='
white
', skip=false):
"""寫入日誌檔案
:param content: 寫入內容
:param colour: 顏色
:param skip: 是否跳過列印時間
:return:
"""#
顏色**
colour_dict =
choice = colour_dict.get(colour) #
選擇顏色
path = os.path.join(self.base_dir, "
output_%s.log
" % number) #
日誌檔案
with open(path, mode='
a+', encoding='
utf-8
') as f:
if skip is false: #
不跳過列印時間時
content = time.strftime('
%y-%m-%d %h:%m:%s
') + '
' +content
info = "
\033[1;{};1m{}\033[0m
".format(choice, content)
(info)
f.write(content + "\n"
)
defhas_null(self, key, number):
"""輸出key
:param key: 鍵值
:param number: 檔案標記
:return: bool
"""key =key.strip()
ifnot
data_dict.get(key):
self.write_log(number,
"錯誤,{} 記錄為空
".format(key),"
red"
)
return
false
(key)
return
true
defread_file(self, number):
"""讀取檔案
:param number: 檔案標記
:return:
"""file_name = os.path.join(self.base_dir, "
data
", "
%s.txt
" %number)
#print(file_name)
self.write_log(number, "
開始讀取檔案 {}
".format(file_name),"
green")
with open(file_name, encoding='
utf-8
') as f:
#使用協程池,執行任務。語法: pool.map(func,iterator)
#partial使用偏函式傳遞引數
#注意:has_null第乙個引數,必須是迭代器遍歷的值
pool.map(partial(self.has_null, number=number), f)
self.write_log(number,
"結束檔案讀取 {} 完成
".format(file_name),"
green")
return
true
defrun(self, number):
"""讀取指定的檔案,判斷每乙個key是否為空
:param number:
:return:
"""startime = time.time() #
開始時間
#清空日誌
path = os.path.join(self.base_dir, "
output_%s.log
" % number) #
日誌檔案
with open(path, mode='w'
) as f:
pass
self.read_file(number)
endtime =time.time()
take_time = endtime -startime
if take_time < 1: #
判斷不足1秒時
take_time = 1 #
設定為1秒
#計算花費時間
m, s = divmod(take_time, 60)
h, m = divmod(m, 60)
self.write_log(number,
"%s.txt 花費時間 %02d:%02d:%02d
" % (number,h, m, s),"
green")
defmain(self):
"""使用多執行緒執行程式
:return:
"""#
檔案標記列表
file_list = ["
7001
", "
7002
", "
7003"]
p_lst = #
執行緒列表
for i in
file_list:
#self.run(i)
p = process(target=self.run, args=(i,)) #
子程序呼叫函式
p.start() #
啟動子程序
將所有程序寫入列表中
for p in p_lst: p.join() #
檢測p是否結束,如果沒有結束就阻塞直到結束,否則不阻塞
testprogram().main()
#啟動主程式,它會開啟3個程序。
執行輸出:
肖祥 閱讀(
...)
編輯收藏
python 多程序和協程配合使用
有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。redis或者mysql 為空的記錄,需要寫入到日誌檔案中!1.使用多程序技術,每乙個程序讀取乙個txt檔案 2.使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 20...
python 多執行緒和協程速率測試對比
多執行緒和協程都屬於io密集型,我通過以下用例測試多執行緒和協程的實際速率對比。例項 通過socket客戶端以多執行緒併發模式請求不同伺服器端 這裡伺服器端分2種寫法 第一種伺服器通過協程實現,第二種伺服器通過多執行緒實現 的訪問速率 第一種伺服器端寫法 通過gevent實現 第二種伺服器端寫法 通...
Python的執行緒 程序和協程
程序 乙個程序就是乙個正在執行的程式,它是計cpu分配資源的最小單位。每個程序都有自己獨立的記憶體空間。能同時執行的程序數最多不超過核心數,也就是每個核心 同一時刻只能執行乙個程序。那麼多程序就是能 同時 執行多個程序 比如同時聽 和寫 這裡的 同時 可以指cpu通過極快地在程序間來回切換來實現,所...