本次实验的目的是使用MPI的并行性来进行矩阵乘法优化,本人使用 Python 实现
0. 硬件信息
实验硬件:
- CPU:AMD Ryzen 7 5800H(3.20 GHz)
- 内存:32GB (3200MHz)
1. 实验要求、数据
要求:使用一个矩阵,一个向量相乘,分别用单进程和多进程的mpi接口实现。
全局的规模参数是 Scale
数据示例:
当 Scale=5
时,数据示例如下:
矩阵形式:
[ 2 − 1 0 0 0 − 1 2 − 1 0 0 0 − 1 2 − 1 0 0 0 − 1 2 0 0 0 0 − 1 2 ] \begin{bmatrix}2&-1&0&0&0\\-1&2&-1&0&0\\0&-1&2&-1&0\\0&0&-1&2&0\\0&0&0&-1&2\end{bmatrix} 2−1000−12−1000−12−1000−12−100002
向量形式:
[ 1 2 3 1 2 ] \begin{bmatrix}1&2&3&1&2\end{bmatrix} [12312]
备注:矩阵由三个对眼矩阵(方阵)合并而成,向量由1,2,3按顺序重复组成
2. 数据生成实现
generate_example_matrix 使用三个对眼矩阵的蒙版控制三个矩阵相加
generate_example_vector 以重复的1,2,3进行repeat
import numpy as np
from functools import wraps
import time
def generate_example_matrix(h, w):
_vs = (-1, 2, -1)
_i = -1 # shift bits
example_data = np.zeros([h, w], dtype=np.int32)
for i in range(3):
example_data_eye_mask = np.eye(h, w, _i + i,
dtype=np.bool_) # build eyes and shift
example_data[example_data_eye_mask == True] = _vs[i]
return example_data
def generate_example_vector(w):
_rest_dict = {
1: [1],
2: [1, 2],
3: [1, 2, 3],
}
rest_bits = int(w % 3)
repeat_times = int(w // 3)
example_vector = np.repeat([[1, 2, 3]], repeat_times, axis=0).ravel()
if rest_bits > 0:
tail_vec = _rest_dict[rest_bits]
tail_vec = np.array(tail_vec, dtype=np.int32)
example_vector = np.concatenate([example_vector, tail_vec], axis=0)
return example_vector
将两个生成函数放在utils.py
中。
3. 矩阵计算实现
在numpy里面由@运算和dot运算,这些运算是经过编译优化的,也就是说本身就是并行运算。
本次实验不使用这样的运算,因为numpy的编译优化实在过于强大,我测试过使用numpy的@运算,单进程下居然比mpi还要快。
所以,为了凸显mpi并行优化之后的效果,我们使用相对naive的双重for循环实现。
这是numpy的实现:
# def numpy_method(example_matrix, example_vector):
# return example_matrix @ example_vector
这是双重for循环的实现:
def naive_method(example_matrix, example_vector):
result = []
h, w = example_matrix.shape
for hi in range(h):
colv = example_matrix[hi, :]
temp = 0
for wi in range(w):
temp += colv[wi] * example_vector[wi]
result.append(temp)
return np.array(result)
测试两种写法的结果准确性:
h = 5
w = 5
print('--- Current matrix scale is: {} x {} ---'.format(h,w))
example_matrix = generate_example_matrix(h, w)
example_vector = generate_example_vector(w)
result = numpy_method(example_matrix, example_vector)
print(result)
result = naive_method(example_matrix, example_vector)
print(result)
生成的示例数据:
example_matrix:
[[ 2 -1 0 0 0]
[-1 2 -1 0 0]
[ 0 -1 2 -1 0]
[ 0 0 -1 2 -1]
[ 0 0 0 -1 2]]
example_vector:
[1 2 3 1 2]
输出:
--- Current matrix scale is: 5 x 5 ---
[ 0 0 3 -3 3]
[ 0 0 3 -3 3]
4. 单进程实现
from utils import generate_example_matrix, generate_example_vector
def naive_method(example_matrix, example_vector):
result = []
h, w = example_matrix.shape
for hi in range(h):
colv = example_matrix[hi, :]
temp = 0
for wi in range(w):
temp += colv[wi] * example_vector[wi]
result.append(temp)
return np.array(result)
def single_main():
start_time = time.perf_counter()
h = 5
w = 5
print('--- Current matrix scale is: {} x {} ---'.format(h,w))
example_matrix = generate_example_matrix(h, w)
example_vector = generate_example_vector(w)
result = numpy_method(example_matrix, example_vector)
result = naive_method(example_matrix, example_vector)
end_time = time.perf_counter()
print('single method used time is: {:.2f}s\n'.format(end_time - start_time))
if __name__ == '__main__':
single_main()
5. 多进程实现
使用mpi4py
安装如下:
pip install mpi4py-mpich
pip install mpi4py
由于mpi4py使用的是mpi2-standard的标准,对于超过3GB的序列化对象会打包失败
当矩阵的规模过大的时候(比如计算50000 x 50000的矩阵),在mpi2-standard中,numpy数组的分发会出现缓冲区溢出。
因此我参照mpi4py的issue所述: https://github.com/mpi4py/mpi4py/issues/119
进行了分发的修改:
将实现中的:
input_data = comm.scatter(input_data, root = 0)
改成:
def bug_fix(comm, attributes):
if comm.size == 0:
(item,) = attributes
elif comm.rank == 0:
for i, attr in enumerate(attributes):
if i == 0:
item = attr
else:
comm.send(attr, dest=i)
else:
item = comm.recv(source=0)
return item
# -----------------------
# input_data = comm.scatter(input_data, root = 0) # when buffer more than 3GB, failed
input_data = bug_fix(comm, input_data)
并且使用pkl5进行序列化打包:
from mpi4py.util import pkl5
# comm = MPI.COMM_WORLD # before
comm = pkl5.Intercomm(MPI.COMM_WORLD) # after
完整代码如下:
from mpi4py import MPI
import time
import numpy as np
from utils import *
import sys
from mpi4py.util import pkl5
# WARNING: https://github.com/mpi4py/mpi4py/issues/119
# mpi4py cannot use more 3GB buffer for numpy array
# comm = MPI.COMM_WORLD
comm = pkl5.Intercomm(MPI.COMM_WORLD)
rank = comm.Get_rank()
nprocs = comm.Get_size()
# def numpy_method(example_matrix, example_vector):
# return example_matrix @ example_vector
def naive_method(example_matrix, example_vector):
result = []
h, w = example_matrix.shape
for hi in range(h):
colv = example_matrix[hi, :]
temp = 0
for wi in range(w):
temp += colv[wi] * example_vector[wi]
result.append(temp)
return np.array(result)
def bug_fix(comm, attributes):
if comm.size == 0:
(item,) = attributes
elif comm.rank == 0:
for i, attr in enumerate(attributes):
if i == 0:
item = attr
else:
comm.send(attr, dest=i)
else:
item = comm.recv(source=0)
return item
def main():
end_time = None
h, w = int(sys.argv[1]), int(sys.argv[2]) # `h` is the taskstep param
# ===============================
# scatter process
if rank == 0:
start_time = time.perf_counter()
example_matrix = generate_example_matrix(h, w)
example_vector = generate_example_vector(w)
# input_data = (example_matrix, example_vector)
slice_length, res = divmod(h, nprocs)
rank_indexes = np.arange(0, h, slice_length)
input_data = []
start_index = rank_indexes[0]
for i in rank_indexes:
start_index = i
sub_example_matrix = example_matrix[i:i + slice_length, :]
sub_example_vector = example_vector
input_data.append((sub_example_matrix, sub_example_vector))
if res > 0:
sub_example_matrix = example_matrix[start_index:, :]
sub_example_vector = example_vector
input_data.append((sub_example_matrix, sub_example_vector))
else:
input_data = None
output_data = None
# input_data = comm.scatter(input_data, root = 0)
input_data = bug_fix(comm, input_data)
sub_example_matrix, sub_example_vector = input_data
result = naive_method(sub_example_matrix, sub_example_vector)
output_data = comm.gather(result, root = 0)
# gather process
if rank == 0:
output_data = np.concatenate(output_data, axis = 0)
print(output_data)
# =================
end_time = time.perf_counter()
print('mpi method by process-size:{} used time is: {:.2f}s\n'.format(nprocs, end_time - start_time))
else:
input_data = None
output_data = None
if __name__ == '__main__':
main()
启动以上脚本的测试:
将以上脚本命名为:multiprocess_main.py
启动mpi接口的命令:
mpiexec -np [进程数] python multiprocess_main.py [矩阵H参数] [矩阵W参数]
示例:
mpiexec -np 8 python multiprocess_main.py 5000 5000
使用8个进程,以并行进程计算5000x5000的矩阵和5000x1进行矩乘。
总流程如下:
- step1:在root = 0的进程进行分发。(根据tasksteps,即矩阵的规模)
- step2:每个进程使用双重循环计算元素相乘
- step3:所有子进程将运算结果规约到root = 0的进程
- step4:只保留root0的运算结果
6. 优化结果对比
设置4组不同规模的测试参数,以10倍规模递增
scale = 5, 500, 5000, 50000
多进程并行进程数测试参数:5,10,16,20,25
关于强拓展性和弱拓展性:
- 强扩展:如果在增加进程或线程个数时,可以维持固定效率,却不增加问题规模
- 弱扩展:如果在增加进程或线程个数时,只有以相同倍率增加问题规模才能保持效率值
强拓展和弱拓展探究的是并行开销和并行价值之间权衡的哲学
单进程测试结果:
--- Current matrix scale is: 5 x 5 ---
single method used time is: 0.000268s
--- Current matrix scale is: 50 x 50 ---
single method used time is: 0.000843s
--- Current matrix scale is: 500 x 500 ---
single method used time is: 0.045645s
--- Current matrix scale is: 5000 x 5000 ---
single method used time is: 5.010791s
--- Current matrix scale is: 50000 x 50000 ---
single method used time is: 498.076443s
分析:由上可见,当规模是5000的时候,单进程的运算时间还可以接受,但是再上升一个数量级的时候,运行时间发生剧增,使用了将近500秒的时间完成。
多进程测试结果:
5进程并行,计算规模5
--- Current matrix scale is: 5 x 5 ---
mpi method by process-size:5 used time is: 0.001074s
5进程并行,计算规模50
--- Current matrix scale is: 50 x 50 ---
mpi method by process-size:5 used time is: 0.001124s
5进程并行,计算规模500
--- Current matrix scale is: 500 x 500 ---
mpi method by process-size:5 used time is: 0.011929s
5进程并行,计算规模5000
--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:5 used time is: 1.182357s
5进程并行,计算规模50000
--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:5 used time is: 126.274598s
10进程并行,计算规模50
--- Current matrix scale is: 50 x 50 ---
mpi method by process-size:10 used time is: 0.007400s
10进程并行,计算规模500
--- Current matrix scale is: 500 x 500 ---
mpi method by process-size:10 used time is: 0.016241s
10进程并行,计算规模5000
--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:10 used time is: 0.802658s
10进程并行,计算规模50000
--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:10 used time is: 84.558449s
16进程并行,计算规模50000
--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:16 used time is: 70.913076s
20进程并行,计算规模5000
--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:20 used time is: 0.805241s
20进程并行,计算规模50000
--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:20 used time is: 73.206234s
25进程并行,计算规模50000
--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:25 used time is: 73.724819s
汇总表格:
单进程:
规模 | 时间(s) |
---|---|
5 | 0.00026 |
50 | 0.00084 |
500 | 0.04564 |
5000 | 5.0107 |
50000 | 498.07644 |
并行进程数:5
规模 | 时间(s) |
---|---|
5 | 0.00107 |
50 | 0.00112 |
500 | 0.01192 |
5000 | 1.18235 |
50000 | 126.27459 |
并行进程数:10
规模 | 时间(s) |
---|---|
50 | 0.00740 |
500 | 0.01624 |
5000 | 0.80265 |
50000 | 84.55844 |
并行进程数:16
规模 | 时间(s) |
---|---|
50000 | 70.91307 |
并行进程数:20
规模 | 时间(s) |
---|---|
5000 | 0.80524 |
50000 | 73.20623 |
并行进程数:25文章来源:https://www.toymoban.com/news/detail-421462.html
规模 | 时间(s) |
---|---|
50000 | 73.72481 |
分析:可见在多进程并行起到优化作用,而当进程数达到一定的时候(16),出现优化停止的现象,时间并没有减少,反而在进程数大于16的时候出现增加,说明并行开销已经出现负反馈,因此需要增加问题规模去提高优化率。在面临资源极限之前,呈现强拓展性。当达到资源极限的时候,出现弱拓展性。文章来源地址https://www.toymoban.com/news/detail-421462.html
到了这里,关于高性能计算的矩阵乘法优化 - Python +MPI的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!