python 多程序和協程配合使用

2021-10-09 14:20:16 字數 3445 閱讀 3826

有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。

現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。(redis或者mysql)

為空的記錄,需要寫入到日誌檔案中!

1. 使用多程序技術,每乙個程序讀取乙個txt檔案

2. 使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 2000條記錄

注意:開啟檔案操作,最好在乙個程序中,重複開啟檔案,會造成系統資源浪費!

#!/usr/bin/env python3

# coding: utf-8

"""多執行緒和協程配合使用示例

"""import os

import time

from gevent import monkey;monkey.patch_all()

from gevent.pool import pool

from functools import partial

from multiprocessing import process

coroutine_number = 2000 # 協程池數量

pool = pool(coroutine_number) # 使用協程池

# 模擬資料倉儲,測試資料

data_dict =

class testprogram(object): # 測試程式

def __init__(self):

self.base_dir = os.path.dirname(os.path.abspath(__file__)) # 專案根目錄

def write_log(self,number, content, colour='white', skip=false):

"""寫入日誌檔案

:param content: 寫入內容

:param colour: 顏色

:param skip: 是否跳過列印時間

:return:

"""# 顏色**

colour_dict =

choice = colour_dict.get(colour) # 選擇顏色

path = os.path.join(self.base_dir, "output_%s.log" % number) # 日誌檔案

with open(path, mode='a+', encoding='utf-8') as f:

if skip is false: # 不跳過列印時間時

content = time.strftime('%y-%m-%d %h:%m:%s') + ' ' + content

info = "\033[1;{};1m{}\033[0m".format(choice, content)

print(info)

f.write(content + "\n")

def has_null(self, key, number):

"""輸出key

:param key: 鍵值

:param number: 檔案標記

:return: bool

"""key = key.strip()

if not data_dict.get(key):

self.write_log(number,"錯誤,{} 記錄為空".format(key),"red")

return false

print(key)

return true

def read_file(self, number):

"""讀取檔案

:param number: 檔案標記

:return:

"""file_name = os.path.join(self.base_dir, "data", "%s.txt" % number)

# print(file_name)

self.write_log(number, "開始讀取檔案 {}".format(file_name),"green")

with open(file_name, encoding='utf-8') as f:

# 使用協程池,執行任務。語法: pool.map(func,iterator)

# partial使用偏函式傳遞引數

# 注意:has_null第乙個引數,必須是迭代器遍歷的值

pool.map(partial(self.has_null, number=number), f)

self.write_log(number, "結束檔案讀取 {} 完成".format(file_name),"green")

return true

def run(self, number):

"""讀取指定的檔案,判斷每乙個key是否為空

:param number:

:return:

"""startime = time.time() # 開始時間

# 清空日誌

path = os.path.join(self.base_dir, "output_%s.log" % number) # 日誌檔案

with open(path, mode='w') as f:

pass

self.read_file(number)

endtime = time.time()

take_time = endtime - startime

if take_time < 1: # 判斷不足1秒時

take_time = 1 # 設定為1秒

# 計算花費時間

m, s = divmod(take_time, 60)

h, m = divmod(m, 60)

self.write_log(number, "%s.txt 花費時間 %02d:%02d:%02d" % (number,h, m, s),"green")

def main(self):

"""使用多執行緒執行程式

:return:

"""# 檔案標記列表

file_list = ["7001", "7002", "7003"]

p_lst = # 執行緒列表

for i in file_list:

# self.run(i)

p = process(target=self.run, args=(i,)) # 子程序呼叫函式

p.start() # 啟動子程序

for p in p_lst: p.join() # 檢測p是否結束,如果沒有結束就阻塞直到結束,否則不阻塞

testprogram().main() # 啟動主程式,它會開啟3個程序。

view code

執行輸出:

python 多程序和協程配合使用寫入資料

一 需求分析 有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。redis或者mysql 為空的記錄,需要寫入到日誌檔案中!任務分工 1.使用多程序技術,每乙個程序讀取乙個txt檔案 2.使用協程技術,批量讀取txt檔案記...

python 多執行緒和協程配合使用

有一批key已經寫入到3個txt檔案中,每乙個txt檔案有30萬行記錄。現在需要讀取這些txt檔案,判斷key是否在資料倉儲中。redis或者mysql 為空的記錄,需要寫入到日誌檔案中!1.使用多執行緒技術,每乙個執行緒讀取乙個txt檔案 2.使用協程技術,批量讀取txt檔案記錄。比如一次性讀取 ...

Python的執行緒 程序和協程

程序 乙個程序就是乙個正在執行的程式,它是計cpu分配資源的最小單位。每個程序都有自己獨立的記憶體空間。能同時執行的程序數最多不超過核心數,也就是每個核心 同一時刻只能執行乙個程序。那麼多程序就是能 同時 執行多個程序 比如同時聽 和寫 這裡的 同時 可以指cpu通過極快地在程序間來回切換來實現,所...