有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。
現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。(redis或者mysql)
為空的記錄,需要寫入到日誌檔案中!
1. 使用多程序技術,每乙個程序讀取乙個txt檔案
2. 使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 2000條記錄
注意:開啟檔案操作,最好在乙個程序中,重複開啟檔案,會造成系統資源浪費!
#!/usr/bin/env python3
# coding: utf-8
"""多執行緒和協程配合使用示例
"""import os
import time
from gevent import monkey;monkey.patch_all()
from gevent.pool import pool
from functools import partial
from multiprocessing import process
coroutine_number = 2000 # 協程池數量
pool = pool(coroutine_number) # 使用協程池
# 模擬資料倉儲,測試資料
data_dict =
class testprogram(object): # 測試程式
def __init__(self):
self.base_dir = os.path.dirname(os.path.abspath(__file__)) # 專案根目錄
def write_log(self,number, content, colour='white', skip=false):
"""寫入日誌檔案
:param content: 寫入內容
:param colour: 顏色
:param skip: 是否跳過列印時間
:return:
"""# 顏色**
colour_dict =
choice = colour_dict.get(colour) # 選擇顏色
path = os.path.join(self.base_dir, "output_%s.log" % number) # 日誌檔案
with open(path, mode='a+', encoding='utf-8') as f:
if skip is false: # 不跳過列印時間時
content = time.strftime('%y-%m-%d %h:%m:%s') + ' ' + content
info = "\033[1;{};1m{}\033[0m".format(choice, content)
print(info)
f.write(content + "\n")
def has_null(self, key, number):
"""輸出key
:param key: 鍵值
:param number: 檔案標記
:return: bool
"""key = key.strip()
if not data_dict.get(key):
self.write_log(number,"錯誤,{} 記錄為空".format(key),"red")
return false
print(key)
return true
def read_file(self, number):
"""讀取檔案
:param number: 檔案標記
:return:
"""file_name = os.path.join(self.base_dir, "data", "%s.txt" % number)
# print(file_name)
self.write_log(number, "開始讀取檔案 {}".format(file_name),"green")
with open(file_name, encoding='utf-8') as f:
# 使用協程池,執行任務。語法: pool.map(func,iterator)
# partial使用偏函式傳遞引數
# 注意:has_null第乙個引數,必須是迭代器遍歷的值
pool.map(partial(self.has_null, number=number), f)
self.write_log(number, "結束檔案讀取 {} 完成".format(file_name),"green")
return true
def run(self, number):
"""讀取指定的檔案,判斷每乙個key是否為空
:param number:
:return:
"""startime = time.time() # 開始時間
# 清空日誌
path = os.path.join(self.base_dir, "output_%s.log" % number) # 日誌檔案
with open(path, mode='w') as f:
pass
self.read_file(number)
endtime = time.time()
take_time = endtime - startime
if take_time < 1: # 判斷不足1秒時
take_time = 1 # 設定為1秒
# 計算花費時間
m, s = divmod(take_time, 60)
h, m = divmod(m, 60)
self.write_log(number, "%s.txt 花費時間 %02d:%02d:%02d" % (number,h, m, s),"green")
def main(self):
"""使用多執行緒執行程式
:return:
"""# 檔案標記列表
file_list = ["7001", "7002", "7003"]
p_lst = # 執行緒列表
for i in file_list:
# self.run(i)
p = process(target=self.run, args=(i,)) # 子程序呼叫函式
p.start() # 啟動子程序
for p in p_lst: p.join() # 檢測p是否結束,如果沒有結束就阻塞直到結束,否則不阻塞
testprogram().main() # 啟動主程式,它會開啟3個程序。
view code
執行輸出:
python 多程序和協程配合使用寫入資料
一 需求分析 有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。redis或者mysql 為空的記錄,需要寫入到日誌檔案中!任務分工 1.使用多程序技術,每乙個程序讀取乙個txt檔案 2.使用協程技術,批量讀取txt檔案記...
python 多執行緒和協程配合使用
有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。redis或者mysql 為空的記錄,需要寫入到日誌檔案中!1.使用多執行緒技術,每乙個執行緒讀取乙個txt檔案 2.使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 ...
Python的執行緒 程序和協程
程序 乙個程序就是乙個正在執行的程式,它是計cpu分配資源的最小單位。每個程序都有自己獨立的記憶體空間。能同時執行的程序數最多不超過核心數,也就是每個核心 同一時刻只能執行乙個程序。那麼多程序就是能 同時 執行多個程序 比如同時聽 和寫 這裡的 同時 可以指cpu通過極快地在程序間來回切換來實現,所...