分析结论
多进程可以实现逐行遍历同一个文件(可以保证每一个进程遍历的行都是完整且互不重复的),且可以提高遍历性能。
多进程 / 多线程遍历文件速度
- 单进程、多线程读取同一个文件时,每个线程的运行时间并不能随线程数的增加的降低;
- 多进程读取同一个文件时,每个进程的运行时间随线程数的增加而降低。
进一步优化方法
通过统计读取到的字符串长度,计算当前文件指针位置,从而避免在每次遍历中均需使用 file.tell()
获取当前文件指针位置。
分析过程
构造 11202 MB、691130 行的测试数据。具体测试数据特征如下:
- 文件大小:11746098941 Bytes(11202 MB)
- 行数:691130
import time
1. 单进程、单线程遍历文件
t1 = time.time()
with open(path, "r", encoding="UTF-8") as file:
for _ in file:
pass
t2 = time.time()
print(t2 - t1)
运行时间:21.79 秒(三次测试 23.55、20.84、21.00 取平均值)
2. 多线程遍历文件
import os
from theading import Thread
定义每个线程 / 进程的遍历函数如下:通过捕获 UnicodeDecodeError
异常,避免出现刚好切分到半个字的情况;通过在遍历前先 file.readline()
,使每一个切分点中尚未结束的行一定归属上一进程 / 线程而不是下一进程 / 线程,从而保证每一个进程 / 线程遍历的每一行都是完整且互不重复的。
def read(path, start, end):
"""每个进程 / 线程的遍历函数
Parameters
----------
path : str
文件路径
start : int
本分块的开始位置
end : int
本分块的结束位置
"""
cnt = 0
with open(path, "r", encoding="UTF-8") as file:
file.seek(start) # 移动到目标位置
if start != 0:
while True:
try:
file.readline() # 跳过当前行(所有未遍历完的行属于上一段)
break
except UnicodeDecodeError: # 刚好切分到半个字,向后移动一个字符
file.seek(start + 1)
while file.tell() <= end:
file.readline()
cnt += 1
定义多线程遍历函数:
def multi_thread_load_csv(path: str, n_thread: int):
"""多线程遍历函数"""
size = os.path.getsize(path) # 获取文件大小用于分块
thread_lst = []
for i in range(n_thread):
s = size // n_thread * i # 计算当前分块开始位置
e = size // n_thread * (i + 1) # 计算当前分块结束位置
thread = Thread(target=read, args=(s, e))
thread.start()
thread_lst.append(thread)
for thread in thread_lst:
thread.join()
测试线程数为 1 - 10 之间的读取时间,每种线程数测试 10 次,测试代码及结果如下:
import numpy as np
for k in range(1, 11):
use_time = []
for _ in range(10):
t1 = time.time()
multi_thread_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)
print(f"线程={k} 平均时间={np.average(use_time)} 标准差={np.std(use_time)}")
线程数 | 用时的平均值 | 用时的标准差 |
---|---|---|
1 | 49.0841 | 0.6299 |
2 | 53.2189 | 1.4267 |
3 | 53.5290 | 1.3273 |
4 | 56.4923 | 1.4843 |
5 | 56.6679 | 3.2745 |
6 | 56.5164 | 1.7789 |
7 | 58.2352 | 1.1148 |
8 | 58.2353 | 0.6817 |
9 | 60.9896 | 1.3365 |
10 | 64.2063 | 2.3251 |
因为在每一行的遍历中需要增加一次 file.tell()
,所以单线程时的速度相较于直接读取会更慢。
3. 多进程遍历文件
from multiprocessing import Process
定义多进程遍历函数:
def multi_process_load_csv(path: str, n_process: int):
"""多进程遍历函数"""
size = os.path.getsize(path) # 获取文件大小用于分块
process_lst = []
for i in range(n_process):
s = size // n_process * i # 计算当前分块开始位置
e = size // n_process * (i + 1) # 计算当前分块结束位置
process = Process(target=read, args=(s, e))
process.start()
process_lst.append(process)
for process in process_lst:
process.join()
测试线程数为 1 - 10 之间的读取时间,每种线程数测试 10 次,测试代码及结果如下:文章来源:https://www.toymoban.com/news/detail-699457.html
import numpy as np
for k in range(1, 11):
use_time = []
for _ in range(10):
t1 = time.time()
multi_process_load_csv("/home/txjiang/archive_analysis/gather_detail.txt", k)
t2 = time.time()
use_time.append(t2 - t1)
print(f"线程={k} 平均时间={np.average(use_time)} 标准差={np.std(use_time)}")
进程数 | 用时的平均值 | 用时的标准差 |
---|---|---|
1 | 50.1561 | 0.8431 |
2 | 26.5089 | 0.5581 |
3 | 17.7663 | 0.2771 |
4 | 13.4338 | 0.3024 |
5 | 10.7654 | 0.2950 |
6 | 9.1950 | 0.3471 |
7 | 7.7160 | 0.1764 |
8 | 7.0321 | 0.1938 |
9 | 6.3484 | 0.2150 |
10 | 5.6354 | 0.1271 |
11 | 5.1283 | 0.2361 |
12 | 4.7841 | 0.0512 |
13 | 4.5149 | 0.2186 |
14 | 4.1525 | 0.0533 |
15 | 3.9554 | 0.1442 |
16 | 3.8481 | 0.1167 |
17 | 3.6455 | 0.0763 |
18 | 3.4030 | 0.0255 |
19 | 3.3732 | 0.2159 |
20 | 3.1933 | 0.0674 |
21 | 3.0091 | 0.0845 |
22 | 2.9235 | 0.0646 |
23 | 2.9474 | 0.2234 |
24 | 2.7500 | 0.0382 |
25 | 2.6592 | 0.0340 |
26 | 2.5687 | 0.0333 |
27 | 2.6273 | 0.3457 |
28 | 2.4343 | 0.0253 |
29 | 2.3647 | 0.0223 |
30 | 2.2572 | 0.0343 |
因为在每一行的遍历中需要增加一次 file.tell()
,所以单进程时的速度相较于直接读取会更慢。文章来源地址https://www.toymoban.com/news/detail-699457.html
到了这里,关于Python 性能优化|多线程读取文件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!