高性能计算的矩阵乘法优化 - Python +MPI的实现

这篇具有很好参考价值的文章主要介绍了高性能计算的矩阵乘法优化 - Python +MPI的实现。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本次实验的目的是使用MPI的并行性来进行矩阵乘法优化,本人使用 Python 实现

0. 硬件信息

实验硬件:

  • CPU:AMD Ryzen 7 5800H(3.20 GHz)
  • 内存:32GB (3200MHz)

1. 实验要求、数据

要求:使用一个矩阵,一个向量相乘,分别用单进程和多进程的mpi接口实现。

全局的规模参数是 Scale

数据示例

Scale=5时,数据示例如下:

矩阵形式:

[ 2 − 1 0 0 0 − 1 2 − 1 0 0 0 − 1 2 − 1 0 0 0 − 1 2 0 0 0 0 − 1 2 ] \begin{bmatrix}2&-1&0&0&0\\-1&2&-1&0&0\\0&-1&2&-1&0\\0&0&-1&2&0\\0&0&0&-1&2\end{bmatrix} 2100012100012100012100002

向量形式:

[ 1 2 3 1 2 ] \begin{bmatrix}1&2&3&1&2\end{bmatrix} [12312]

备注:矩阵由三个对眼矩阵(方阵)合并而成,向量由1,2,3按顺序重复组成


2. 数据生成实现

generate_example_matrix 使用三个对眼矩阵的蒙版控制三个矩阵相加
generate_example_vector 以重复的1,2,3进行repeat

import numpy as np
from functools import wraps
import time

def generate_example_matrix(h, w):
    _vs = (-1, 2, -1)
    _i = -1  # shift bits
    example_data = np.zeros([h, w], dtype=np.int32)
    for i in range(3):
        example_data_eye_mask = np.eye(h, w, _i + i,
                                       dtype=np.bool_)  # build eyes and shift
        example_data[example_data_eye_mask == True] = _vs[i]
    return example_data


def generate_example_vector(w):
    _rest_dict = {
        1: [1],
        2: [1, 2],
        3: [1, 2, 3],
    }
    rest_bits = int(w % 3)
    repeat_times = int(w // 3)
    example_vector = np.repeat([[1, 2, 3]], repeat_times, axis=0).ravel()

    if rest_bits > 0:
        tail_vec = _rest_dict[rest_bits]
        tail_vec = np.array(tail_vec, dtype=np.int32)
        example_vector = np.concatenate([example_vector, tail_vec], axis=0)
    return example_vector
    

将两个生成函数放在utils.py中。


3. 矩阵计算实现

在numpy里面由@运算和dot运算,这些运算是经过编译优化的,也就是说本身就是并行运算。

本次实验不使用这样的运算,因为numpy的编译优化实在过于强大,我测试过使用numpy的@运算,单进程下居然比mpi还要快。

所以,为了凸显mpi并行优化之后的效果,我们使用相对naive的双重for循环实现。

这是numpy的实现:

# def numpy_method(example_matrix, example_vector):
#    return example_matrix @ example_vector

这是双重for循环的实现:

def naive_method(example_matrix, example_vector):
    result = []
    h, w = example_matrix.shape
    for hi in range(h):
        colv = example_matrix[hi, :]
        temp = 0
        for wi in range(w):
            temp += colv[wi] * example_vector[wi]
        result.append(temp)
    return np.array(result)

测试两种写法的结果准确性:

    h = 5
    w = 5
    print('--- Current matrix scale is: {} x {} ---'.format(h,w))
    
    example_matrix = generate_example_matrix(h, w)
    example_vector = generate_example_vector(w)
    
    result = numpy_method(example_matrix, example_vector)
    print(result)
    
    result = naive_method(example_matrix, example_vector)
    print(result)

生成的示例数据:

example_matrix:
[[ 2 -1  0  0  0]
 [-1  2 -1  0  0]
 [ 0 -1  2 -1  0]
 [ 0  0 -1  2 -1]
 [ 0  0  0 -1  2]]
 
example_vector:
[1 2 3 1 2]

输出:

--- Current matrix scale is: 5 x 5 ---
[ 0  0  3 -3  3]
[ 0  0  3 -3  3]

4. 单进程实现

from utils import generate_example_matrix, generate_example_vector

def naive_method(example_matrix, example_vector):
    result = []
    h, w = example_matrix.shape
    for hi in range(h):
        colv = example_matrix[hi, :]
        temp = 0
        for wi in range(w):
            temp += colv[wi] * example_vector[wi]
        result.append(temp)
    return np.array(result)

def single_main():
    start_time = time.perf_counter()
    h = 5
    w = 5
    print('--- Current matrix scale is: {} x {} ---'.format(h,w))
    example_matrix = generate_example_matrix(h, w)
    example_vector = generate_example_vector(w)
    result = numpy_method(example_matrix, example_vector)
    result = naive_method(example_matrix, example_vector)
    end_time = time.perf_counter()
    print('single method used time is: {:.2f}s\n'.format(end_time - start_time))

if __name__ == '__main__':
    single_main()

5. 多进程实现

使用mpi4py

安装如下:

pip install mpi4py-mpich
pip install mpi4py

由于mpi4py使用的是mpi2-standard的标准,对于超过3GB的序列化对象会打包失败

当矩阵的规模过大的时候(比如计算50000 x 50000的矩阵),在mpi2-standard中,numpy数组的分发会出现缓冲区溢出。

因此我参照mpi4py的issue所述: https://github.com/mpi4py/mpi4py/issues/119

进行了分发的修改:

将实现中的:

input_data = comm.scatter(input_data, root = 0)

改成:

def bug_fix(comm, attributes):
    if comm.size == 0:
        (item,) = attributes
    elif comm.rank == 0:
        for i, attr in enumerate(attributes):
            if i == 0:
                item = attr
            else:
                comm.send(attr, dest=i)
    else:
        item = comm.recv(source=0)
    return item
# -----------------------
# input_data = comm.scatter(input_data, root = 0) # when buffer more than 3GB, failed
input_data = bug_fix(comm, input_data)

并且使用pkl5进行序列化打包:

from mpi4py.util import pkl5

# comm = MPI.COMM_WORLD # before
comm = pkl5.Intercomm(MPI.COMM_WORLD) # after

完整代码如下:

from mpi4py import MPI
import time
import numpy as np
from utils import *
import sys
from mpi4py.util import pkl5

# WARNING: https://github.com/mpi4py/mpi4py/issues/119 
# mpi4py cannot use more 3GB buffer for numpy array


# comm = MPI.COMM_WORLD
comm = pkl5.Intercomm(MPI.COMM_WORLD)
rank = comm.Get_rank()
nprocs = comm.Get_size()

# def numpy_method(example_matrix, example_vector):
#     return example_matrix @ example_vector

def naive_method(example_matrix, example_vector):
    result = []
    h, w = example_matrix.shape
    for hi in range(h):
        colv = example_matrix[hi, :]
        temp = 0
        for wi in range(w):
            temp += colv[wi] * example_vector[wi]
        result.append(temp)
    return np.array(result)


def bug_fix(comm, attributes):
    if comm.size == 0:
        (item,) = attributes
    elif comm.rank == 0:
        for i, attr in enumerate(attributes):
            if i == 0:
                item = attr
            else:
                comm.send(attr, dest=i)
    else:
        item = comm.recv(source=0)
    return item

def main():
    end_time = None
    h, w = int(sys.argv[1]), int(sys.argv[2]) # `h` is the taskstep param
    # ===============================
    # scatter process
    if rank == 0:
        start_time = time.perf_counter()
        example_matrix = generate_example_matrix(h, w)
        example_vector = generate_example_vector(w)
        # input_data = (example_matrix, example_vector)
        slice_length, res = divmod(h, nprocs)
        rank_indexes = np.arange(0, h, slice_length)
        input_data = []
        start_index = rank_indexes[0]
        for i in rank_indexes:
            start_index = i
            sub_example_matrix = example_matrix[i:i + slice_length, :]
            sub_example_vector = example_vector
            input_data.append((sub_example_matrix, sub_example_vector))
            
        if res > 0:
            sub_example_matrix = example_matrix[start_index:, :]
            sub_example_vector = example_vector
            input_data.append((sub_example_matrix, sub_example_vector))
    else:
        input_data = None
        output_data = None
        
    # input_data = comm.scatter(input_data, root = 0)
    input_data = bug_fix(comm, input_data)
    sub_example_matrix, sub_example_vector = input_data
    
    result = naive_method(sub_example_matrix, sub_example_vector)
    output_data = comm.gather(result, root = 0)

    # gather process
    if rank == 0:
        output_data = np.concatenate(output_data, axis = 0)
        print(output_data)
        # =================
        end_time = time.perf_counter()
        print('mpi method by process-size:{} used time is: {:.2f}s\n'.format(nprocs, end_time - start_time))
    else:
        input_data = None
        output_data = None

if __name__ == '__main__':
    main()

启动以上脚本的测试:

将以上脚本命名为:multiprocess_main.py

启动mpi接口的命令:

mpiexec -np [进程数] python multiprocess_main.py [矩阵H参数] [矩阵W参数]

示例:

mpiexec -np 8 python multiprocess_main.py 5000 5000

使用8个进程,以并行进程计算5000x5000的矩阵和5000x1进行矩乘。

总流程如下:

  • step1:在root = 0的进程进行分发。(根据tasksteps,即矩阵的规模)
  • step2:每个进程使用双重循环计算元素相乘
  • step3:所有子进程将运算结果规约到root = 0的进程
  • step4:只保留root0的运算结果

6. 优化结果对比

设置4组不同规模的测试参数,以10倍规模递增

scale = 5, 500, 5000, 50000

多进程并行进程数测试参数:5,10,16,20,25

关于强拓展性和弱拓展性:

  • 强扩展:如果在增加进程或线程个数时,可以维持固定效率,却不增加问题规模
  • 弱扩展:如果在增加进程或线程个数时,只有以相同倍率增加问题规模才能保持效率值

强拓展和弱拓展探究的是并行开销和并行价值之间权衡的哲学

单进程测试结果

--- Current matrix scale is: 5 x 5 ---
single method used time is: 0.000268s
--- Current matrix scale is: 50 x 50 ---
single method used time is: 0.000843s
--- Current matrix scale is: 500 x 500 ---
single method used time is: 0.045645s
--- Current matrix scale is: 5000 x 5000 ---
single method used time is: 5.010791s
--- Current matrix scale is: 50000 x 50000 ---
single method used time is: 498.076443s

分析:由上可见,当规模是5000的时候,单进程的运算时间还可以接受,但是再上升一个数量级的时候,运行时间发生剧增,使用了将近500秒的时间完成。


多进程测试结果

5进程并行,计算规模5

--- Current matrix scale is: 5 x 5 ---
mpi method by process-size:5 used time is: 0.001074s

5进程并行,计算规模50

--- Current matrix scale is: 50 x 50 ---
mpi method by process-size:5 used time is: 0.001124s

5进程并行,计算规模500

--- Current matrix scale is: 500 x 500 ---
mpi method by process-size:5 used time is: 0.011929s

5进程并行,计算规模5000

--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:5 used time is: 1.182357s

5进程并行,计算规模50000

--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:5 used time is: 126.274598s

10进程并行,计算规模50

--- Current matrix scale is: 50 x 50 ---
mpi method by process-size:10 used time is: 0.007400s

10进程并行,计算规模500

--- Current matrix scale is: 500 x 500 ---
mpi method by process-size:10 used time is: 0.016241s

10进程并行,计算规模5000

--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:10 used time is: 0.802658s

10进程并行,计算规模50000

--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:10 used time is: 84.558449s

16进程并行,计算规模50000

--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:16 used time is: 70.913076s

20进程并行,计算规模5000

--- Current matrix scale is: 5000 x 5000 ---
mpi method by process-size:20 used time is: 0.805241s

20进程并行,计算规模50000

--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:20 used time is: 73.206234s

25进程并行,计算规模50000

--- Current matrix scale is: 50000 x 50000 ---
mpi method by process-size:25 used time is: 73.724819s

汇总表格

单进程:

规模 时间(s)
5 0.00026
50 0.00084
500 0.04564
5000 5.0107
50000 498.07644

并行进程数:5

规模 时间(s)
5 0.00107
50 0.00112
500 0.01192
5000 1.18235
50000 126.27459

并行进程数:10

规模 时间(s)
50 0.00740
500 0.01624
5000 0.80265
50000 84.55844

并行进程数:16

规模 时间(s)
50000 70.91307

并行进程数:20

规模 时间(s)
5000 0.80524
50000 73.20623

并行进程数:25

规模 时间(s)
50000 73.72481

分析:可见在多进程并行起到优化作用,而当进程数达到一定的时候(16),出现优化停止的现象,时间并没有减少,反而在进程数大于16的时候出现增加,说明并行开销已经出现负反馈,因此需要增加问题规模去提高优化率。在面临资源极限之前,呈现强拓展性。当达到资源极限的时候,出现弱拓展性。文章来源地址https://www.toymoban.com/news/detail-421462.html

到了这里,关于高性能计算的矩阵乘法优化 - Python +MPI的实现的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 《高性能MySQL》——查询性能优化(笔记)

    将查询看作一个任务,那么它由一系列子任务组成,实际我们所做的就是: 消除一些子任务 减少子任务的执行次数 让子任务运行更快 查询的生命周期大概可分为 = { 客户端 服务器 : 进行解析 , 生成执行计划 执行:包括到存储引擎的调用,以及用后的数据处理 { 排序 分组

    2024年02月13日
    浏览(57)
  • C++高性能优化编程之如何测量性能(一)

    C++高性能优化编程系列 深入理解设计原则系列 深入理解设计模式系列 高级C++并发线程编程 不好的编程习惯,不重视程序性能测量分析让代码跑的更快,会导致 浪费大量的CPU周期、程序响应时间慢以及卡顿,用户满意度下降,进而浪费大量的时间返工去重构本应该一开始就

    2024年02月06日
    浏览(65)
  • MySQL高性能优化规范建议

    数据库命令规范 数据库基本设计规范 1. 所有表必须使用 Innodb 存储引擎 2. 数据库和表的字符集统一使用 UTF8 3. 所有表和字段都需要添加注释 4. 尽量控制单表数据量的大小,建议控制在 500 万以内。 5. 谨慎使用 MySQL 分区表 6.尽量做到冷热数据分离,减小表的宽度 7. 禁止在表中建

    2024年02月12日
    浏览(52)
  • Kafka高性能集群部署与优化

    Kafka 是由Apache Software Foundation开发的一个分布式流处理平台,源代码以Scala编写。Kafka最初是由LinkedIn公司开发的,于2011年成为Apache的顶级项目之一。它是一种高吞吐量、可扩展的发布订阅消息系统,具有以下特点: 高吞吐量:Kafka每秒可以处理数百万条消息。 持久化:数据存

    2024年02月13日
    浏览(62)
  • 如何评估和优化系统的高性能

    系统的关键性能指标:吞吐量,延迟和TP。 吞吐量:反应单位时间内处理请求的能力。 延迟:从客户端发送请求到接收响应的时间。 延迟和吞吐量的曲线如下图所示: 总体来看,随着压力增大,系统单位时间内被访问的次数增加。结合延迟和吞吐量观察的话,系统优化性能

    2024年02月22日
    浏览(59)
  • 数据库——MySQL高性能优化规范

    所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用 MySQL 保留(如果表名中包含查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过 32 个字符 临时库表必须以 tmp_为前缀并以日期为后缀,

    2024年02月11日
    浏览(103)
  • 读高性能MySQL(第4版)笔记10_查询性能优化(上)

    4.11.1.1. 在存储引擎层完成的 4.11.2.1. 直接从索引中过滤不需要的记录并返回命中的结 4.11.2.2. 在MySQL服务器层完成的,但无须再回表查询记录 4.11.3.1. 在MySQL服务器层完成 4.11.3.2. 需要先从数据表中读出记录然后过滤 4.13.2.1. 使用单独的汇总表 5.5.1.1. 定期清除大量数据时,

    2024年02月08日
    浏览(60)
  • 读高性能MySQL(第4版)笔记12_查询性能优化(下)

    2.3.1.1. 读取行指针和需要排序的字段,对其进行排序,然后再根据排序结果读取所需要的数据行 2.3.1.2. 即需要从数据表中读取两次数据,第二次读取数据的时候,因为是读取排序列进行排序后的所有记录,这会产生大量的随机I/O,所以两次传输排序的成本非常高 2.3.2.1. 先

    2024年02月08日
    浏览(49)
  • 企业如何构建高性能计算云?

    HPC是推动科学和工程应用发展的重要组成部分。除了将处理器向Exascale迈进之外,工作负载的性质也在不断变化—从传统的模拟和建模到混合工作负载,包括企业内部和云应用,还需要整合、吸收和分析来自无数物联网传感器的数据。同时,随着HPC基础设施上的人工智能工作

    2024年02月03日
    浏览(52)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包