最近工作中經常會有讀取乙個檔案,對資料做相關處理並寫入到另外乙個檔案的需求
當檔案行數較少的時候,單程序順序讀取是沒問題的,但是當檔案行數過萬,就需要消耗很客觀的時間。
一、一次性讀入,多程序處理
我最初想到的辦法是多程序,最初的辦法是一次性讀取所有行,然後分配給多個程序處理,最終還是寫入乙個檔案。其中需要借助queue來實現對異常的捕獲和處理,不具有可擴充套件性。
同時一次性讀取乙個檔案,寫入記憶體也受到記憶體大小的限制。而且這種多程序情況下返回值的處理也比較麻煩。
**見python併發——多程序中的異常捕獲
二、多次讀入,並行處理
考慮到linux有乙個按行分割檔案的功能split,可以借助她實現資料平行計算,思路是這樣的,通過計算檔案的總行數,將檔案分割成行數相等的多個小檔案,小檔案個數可以大於或等於併發度。
開啟多程序對每個小檔案分別處理,每個小檔案處理完都輸出到一一對應的目標小檔案,最終將目標小檔案進行合併。
**如下:
from multiprocessing importpool
import
json
from time import
sleep
import
requests
import
ossrc_mid = '
_src_
'dst_mid = '
_dst_'#
業務邏輯處理
defget_jw(addr_name):
url = '
'result = requests.get(url.format(addr_name=addr_name))
result_str = str(result.content, encoding="
utf-8")
rj =json.loads(result_str)
if len(rj['
geocodes
']) >0:
jwd = rj['
geocodes
'][0]['
location']
(jwd)
return addr_name + '
,' + jwd + '\n'
else
:
print('
-,-'
)
return addr_name + '
,' + '
-,-' + '\n'
#輸入原始檔,返回分割後的源中間檔案list
class
parallelcompute(object):
def__init__(self, exe_func, source_file, target_file, concurrency=8):
self.exe_func =exe_func
self.source_file =source_file
self.target_file =target_file
self.concurrency =concurrency
self.abs_src_mid_dir =none
self.abs_dst_mid_dir =none
self.src_mid_file_list =none
self.dst_mid_file_list =none
#原始檔分割成多個小檔案
defsplit_file(self):
cur_path = os.path.abspath('.'
) self.abs_src_mid_dir =os.path.join(cur_path, src_mid)
self.abs_dst_mid_dir =os.path.join(cur_path, dst_mid)
os.mkdir(self.abs_src_mid_dir)
os.mkdir(self.abs_dst_mid_dir)
split_cmd = "
split /
".format(src_file=self.source_file,
abs_src_mid=self.abs_src_mid_dir)
(split_cmd)
os.system(split_cmd)
self.src_mid_file_list = [os.path.join(self.abs_src_mid_dir, it) for it in
os.listdir(self.abs_src_mid_dir)]
self.dst_mid_file_list = [src_file.replace(src_mid, dst_mid) for src_file in
self.src_mid_file_list]
#小檔案處理
deftranslate_file(self, src_file, dst_file):
with open(src_file, 'rb
') as f1, open(dst_file, 'a'
) as f2:
line =f1.readline().strip()
line = str(line, encoding='
utf8')
while
line:
try:
jw =self.exe_func(line)
f2.write(jw)
except
exception:
sleep(5)
offset = len(line.encode('
utf8
')) + 1f1.seek(-offset, 1)
line =f1.readline().strip()
line = str(line, encoding='
utf8')
#小檔案合併
defmerge_files(self):
with open(self.target_file, 'a
') as f2:
for dst_m_file in
self.dst_mid_file_list:
with open(dst_m_file, 'r
') as f1:
line =f1.readline()
while
line:
f2.write(line)
line =f1.readline()
#清理中間檔案
defdelete_mid_dir(self):
os.system(
'rm -rf %s
' %self.abs_src_mid_dir)
os.system(
'rm -rf %s
' %self.abs_dst_mid_dir)
defexecute(self):
p =pool(self.concurrency)
self.split_file()
for src_mid_file in
self.src_mid_file_list:
dst_mid_file =src_mid_file.replace(src_mid, dst_mid)
p.close()
p.join()
self.merge_files()
self.delete_mid_dir()
if__name__ == '
__main__':
source_file = '
/opt/test/qiuxue/target.txt
'target_file = '
/opt/test/qiuxue/result3.txt
'pc =parallelcompute(get_jw, source_file, target_file)
pc.execute()
這樣就做到了平行計算和業務邏輯的分離,簡化了呼叫者的使用難度
python對檔案進行平行計算初探 二)
上次的平行計算是通過將大檔案分割成小檔案,涉及到檔案分割,其實更有效的方法是在記憶體中對檔案進行分割,分別計算 最後將返回結果直接寫入目標檔案,省去了分割小檔案合併小檔案刪除小檔案的過程 如下 import json import math from multiprocessing import p...
python平行計算 python平行計算
0.基礎並行 發 multiprocessing threading 1.concurrent 2.併發 asynico 3.ipython下的平行計算 使用ipyparallel庫的ipython提供了前所未有的能力,將科學python的探索能力與幾乎即時訪問多個計算核心相結合。系統可以直觀地與本...
平行計算模型
平行計算模型通常指從並行演算法 的設計和分析出發,將各種並行計算機 至少某一類並行計算機 的基本特徵抽象出來,形成乙個抽象的計算模型。從更廣的意義上說,平行計算模型為平行計算提供了硬體和軟體介面 在該介面的約定下,並行系統硬體設計者和軟體設計 者可以開發對並行性 的支援機制,從而提高系統的效能。有幾...