【深度学习】【分布式训练】Collective通信操作及Pytorch示例

这篇具有很好参考价值的文章主要介绍了【深度学习】【分布式训练】Collective通信操作及Pytorch示例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

相关博客
【Megatron-DeepSpeed】张量并行工具代码mpu详解(一):并行环境初始化
【Megatron-DeepSpeed】张量并行工具代码mpu详解(二):Collective通信操作的封装mappings
【深度学习】【分布式训练】DeepSpeed:AllReduce与ZeRO-DP
【深度学习】混合精度训练与显存分析
【深度学习】【分布式训练】Collective通信操作及Pytorch示例
【自然语言处理】【大模型】大语言模型BLOOM推理工具测试
【自然语言处理】【大模型】GLM-130B:一个开源双语预训练语言模型
【自然语言处理】【大模型】用于大型Transformer的8-bit矩阵乘法介绍
【自然语言处理】【大模型】BLOOM:一个176B参数且可开放获取的多语言模型

Collective通信操作及Pytorch示例

​ 大模型时代,单机已经无法完成先进模型的训练和推理,分布式训练和推理将会是必然的选择。各类分布式训练和推断工具都会使用到Collective通信。网络上大多数的教程仅简单介绍这些操作的原理,没有代码示例来辅助理解。本文会介绍各类Collective通信操作,并展示pytorch中如何使用

一、Collective通信操作

1. AllReduce

​ 将各个显卡的张量进行聚合(sum、min、max)后,再将结果写回至各个显卡。

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

2. Broadcast

​ 将张量从某张卡广播至所有卡。

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

3. Reduce

​ 执行同AllReduce相同的操作,但结果仅写入具有的某个显卡。

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

4. AllGather

​ 每个显卡上有一个大小为N的张量,共有k个显卡。经过AllGather后将所有显卡上的张量合并为一个 N × k N\times k N×k的张量,然后将结果分配至所有显卡上。

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

5. ReduceScatter

​ 执行Reduce相同的操作,但是结果会被分散至不同的显卡。

【深度学习】【分布式训练】Collective通信操作及Pytorch示例

二、Pytorch示例

​ pytorch的分布式包torch.distributed能够方便的实现跨进程和跨机器集群的并行计算。本文代码运行在单机双卡服务器上,并基于下面的模板来执行不同的分布式操作。

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def init_process(rank, size, fn, backend='nccl'):
    """
    为每个进程初始化分布式环境,保证相互之间可以通信,并调用函数fn。
    """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    
    
def run(world_size, func):
    """
    启动world_size个进程,并执行函数func。
    """
    processes = []
    mp.set_start_method("spawn")
    for rank in range(world_size):
        p = mp.Process(target=init_process, args=(rank, world_size, func))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()
        
if __name__ == "__main__":
    run(2, func) # 这里的func随后会被替换为不同的分布式示例函数
    pass

​ 先对上面的模板做一些简单的介绍。

  • 函数run会根据传入的参数world_size,生成对应数量的进程。每个进程都会调用init_process来初始化分布式环境,并调用传入的分布式示例函数。
  • torch.distributed.init_process_group(),该方法负责各进程之间的初始协调,保证各进程都会与master进行握手。该方法在调用完成之前会一直阻塞,并且后续的所有操作都必须在该操作之后。调用该方法时需要初始化下面的4个环境变量:
    • MASTER_PORT:rank 0进程所在机器上的空闲端口;
    • MASTER_ADDR:rank 0进程所在机器上的IP地址;
    • WORLD_SIZE:进程总数;
    • RANK:每个进程的RANK,所以每个进程知道其是否是master;

1. 点对点通信

​ 在介绍其他collective通信之前,先看一个简单的点对点通信实现。

def p2p_block_func(rank, size):
    """
    将rank src上的tensor发送至rank dst(阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    # 对于rank src,该tensor用于发送
    # 对于rank dst,该tensor用于接收
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 发送tensor([1.])
        # group指定了该操作所见进程的范围,默认情况下是整个world
        dist.send(tensor=tensor, dst=1, group=group)
    elif rank == dst:
        # rank dst的tensor初始化为tensor([0.]),但接收后为tensor([1.])
        dist.recv(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_block_func)

p2p_block_func实现从rank 0发送一个tensor([1.0])至rank 1,该操作在发送完成/接收完成之前都会阻塞。

​ 下面是一个不阻塞的版本:

def p2p_unblock_func(rank, size):
    """
    将rank src上的tensor发送至rank dst(非阻塞)。
    """
    src = 0
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.zeros(1).to(torch.device("cuda", rank))
    if rank == src:
        tensor += 1
        # 非阻塞发送
        req = dist.isend(tensor=tensor, dst=dst, group=group)
        print("Rank 0 started sending")
    elif rank == dst:
        # 非阻塞接收
        req = dist.irecv(tensor=tensor, src=src, group=group)
        print("Rank 1 started receiving")
    req.wait()
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, p2p_unblock_func)

p2p_unblock_func是非阻塞版本的点对点通信。使用非阻塞方法时,因为不知道数据何时送达,所以在req.wait()完成之前不要对发送/接收的tensor进行任何操作。

2. Broadcast

def broadcast_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    if rank == src:
        # 对于rank src,初始化tensor([1.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank)) + 1
    else:
        # 对于非rank src,初始化tensor([0.])
        tensor = torch.zeros(1).to(torch.device("cuda", rank))
    # 对于rank src,broadcast是发送;否则,则是接收
    dist.broadcast(tensor=tensor, src=0, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, broadcast_func)

broadcast_func会将rank 0上的tensor([1.])广播至所有的rank上。

3. Reduce与Allreduce

def reduce_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # 对于所有rank都会发送, 但仅有dst会接收求和的结果
    dist.reduce(tensor, dst=dst, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_func)

reduce_func会对group中所有rank的tensor进行聚合,并将结果发送至rank dst。

def allreduce_func(rank, size):
    group = dist.new_group(list(range(size)))
    tensor = torch.ones(1).to(torch.device("cuda", rank))
    # tensor即用来发送,也用来接收
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, allreduce_func)

allreduce_func将group中所有rank的tensor进行聚合,并将结果发送至group中的所有rank。

4. Gather与Allgather

def gather_func(rank, size):
    dst = 1
    group = dist.new_group(list(range(size)))
    # 该tensor用于发送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    gather_list = []
    if rank == dst:
        # gather_list中的tensor数量应该是size个,用于接收其他rank发送来的tensor
        gather_list = [torch.zeros(1).to(torch.device("cuda", dst)) for _ in range(size)]
        # 仅在rank dst上需要指定gather_list
        dist.gather(tensor, gather_list=gather_list, dst=dst, group=group)
    else:
        # 非rank dst,相当于发送tensor
        dist.gather(tensor, dst=dst, group=group)
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, gather_func)

gather_func从group中所有rank上收集tensor,并发送至rank dst。(相当于不进行聚合操作的reduce)

def allgather_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 该tensor用于发送
    tensor = torch.zeros(1).to(torch.device("cuda", rank)) + rank
    # gether_list用于接收各个rank发送来的tensor
    gather_list = [torch.zeros(1).to(torch.device("cuda", rank)) for _ in range(size)]
    dist.all_gather(gather_list, tensor, group=group)
    # 各个rank的gather_list均一致
    print('Rank ', rank, ' has data ', gather_list)
    
if __name__ == "__main__":
    run(2, allgather_func)

allgather_func从group中所有rank上收集tensor,并将收集到的tensor发送至所有group中的rank。

5. Scatter与ReduceScatter

def scatter_func(rank, size):
    src = 0
    group = dist.new_group(list(range(size)))
    # 各个rank用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    if rank == src:
        # 在rank src上,将tensor_list中的tensor分发至不同的rank上
        # tensor_list:[tensor([1.]), tensor([2.])]
        tensor_list = [torch.tensor([i + 1], dtype=torch.float32).to(torch.device("cuda", rank)) for i in range(size)]
        # 将tensor_list发送至各个rank
        # 接收属于rank src的那部分tensor
        dist.scatter(tensor, scatter_list=tensor_list, src=0, group=group)
    else:
        # 接收属于对应rank的tensor
        dist.scatter(tensor, scatter_list=[], src=0, group=group)
    # 每个rank都拥有tensor_list中的一部分tensor
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, scatter_func)

scatter_func会将rank src中的一组tensor逐个分发至其他rank上,每个rank持有的tensor不同。

def reduce_scatter_func(rank, size):
    group = dist.new_group(list(range(size)))
    # 用于接收的tensor
    tensor = torch.empty(1).to(torch.device("cuda", rank))
    # 用于发送的tensor列表
    # 对于每个rank,有tensor_list=[tensor([0.]), tensor([1.])]
    tensor_list = [torch.Tensor([i]).to(torch.device("cuda", rank)) for i in range(size)]
    # step1. 经过reduce的操作会得到tensor列表[tensor([0.]), tensor([2.])]
    # step2. tensor列表[tensor([0.]), tensor([2.])]分发至各个rank
    # rank 0得到tensor([0.]),rank 1得到tensor([2.])
    dist.reduce_scatter(tensor, tensor_list, op=dist.ReduceOp.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor)
    
if __name__ == "__main__":
    run(2, reduce_scatter_func)

参考资料

https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/collectives.html

https://pytorch.org/tutorials/intermediate/dist_tuto.html#collective-communication

https://pytorch.org/docs/stable/distributed.html#collective-functions文章来源地址https://www.toymoban.com/news/detail-412125.html

到了这里,关于【深度学习】【分布式训练】Collective通信操作及Pytorch示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 机器学习洞察 | 分布式训练让机器学习更加快速准确

    机器学习能够基于数据发现一般化规律的优势日益突显,我们看到有越来越多的开发者关注如何训练出更快速、更准确的机器学习模型,而分布式训练 (Distributed Training) 则能够大幅加速这一进程。 亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档

    2024年02月16日
    浏览(48)
  • 1、pytorch分布式数据训练结合学习率周期及混合精度

    正如标题所写,我们正常的普通训练都是单机单卡或单机多卡。而往往一个高精度的模型需要训练时间很长,所以DDP分布式数据并行和混合精度可以加速模型训练。混精可以增大batch size. 如下提供示例代码,经过官网查阅验证的。原始代码由百度文心一言提供。 问题:pytor

    2024年02月07日
    浏览(40)
  • 分布式深度学习库BigDL简述

            BigDL是一个在Apache Spark上构建的分布式深度学习库,由英特尔开发并开源。它允许用户使用Scala或Python语言在大规模数据集上进行深度学习模型的训练和推理。BigDL提供了许多常见的深度学习模型和算法的实现,包括卷积神经网络(CNN)、循环神经网络(RNN)等。由

    2024年04月10日
    浏览(38)
  • 分布式深度学习中的数据并行和模型并行

    🎀个人主页: https://zhangxiaoshu.blog.csdn.net 📢欢迎大家:关注🔍+点赞👍+评论📝+收藏⭐️,如有错误敬请指正! 💕未来很长,值得我们全力奔赴更美好的生活! 对于深度学习模型的预训练阶段,海量的训练数据、超大规模的模型给深度学习带来了日益严峻的挑战,因此,经

    2024年01月24日
    浏览(45)
  • AI框架:9大主流分布式深度学习框架简介

    转载翻译Medium上一篇关于分布式深度学习框架的文章 https://medium.com/@mlblogging.k/9-libraries-for-parallel-distributed-training-inference-of-deep-learning-models-5faa86199c1fmedium.com/@mlblogging.k/9-libraries-for-parallel-distributed-training-inference-of-deep-learning-models-5faa86199c1f 大型深度学习模型在训练时需要大量内

    2024年02月09日
    浏览(49)
  • 【分布式训练】基于Pytorch的分布式数据并行训练

    简介: 在PyTorch中使用DistributedDataParallel进行多GPU分布式模型训练 加速神经网络训练的最简单方法是使用GPU,它在神经网络中常见的计算类型(矩阵乘法和加法)上提供了比CPU更大的加速。随着模型或数据集变得越来越大,一个GPU很快就会变得不足。例如,像BERT和GPT-2这样的

    2024年02月17日
    浏览(51)
  • 【分布式训练】基于PyTorch进行多GPU分布式模型训练(补充)

    简介: 在PyTorch中使用DistributedDataParallel进行多GPU分布式模型训练。 原文链接:https://towardsdatascience.com/distributed-model-training-in-pytorch-using-distributeddataparallel-d3d3864dc2a7 随着以ChatGPT为代表的大模型的不断涌现,如何在合理的时间内训练大模型逐渐成为一个重要的研究课题。为了解

    2024年02月16日
    浏览(47)
  • 【分布式】大模型分布式训练入门与实践 - 04

    【分布式】NCCL部署与测试 - 01 【分布式】入门级NCCL多机并行实践 - 02 【分布式】小白看Ring算法 - 03 【分布式】大模型分布式训练入门与实践 - 04 数据并行(Distributed Data Parallel)是一种用于加快深度学习模型训练速度的技术。在过去,训练大型模型往往受限于单卡训练的瓶颈

    2024年02月08日
    浏览(39)
  • pytorch 分布式训练

    分布式训练分为这几类: 按照并行方式来分:模型并行 vs 数据并行 按照更新方式来分:同步更新 vs 异步更新 按照算法来分:Parameter Server算法 vs AllReduce算法 这个函数主要有三个参数: module:即模型,此处注意,虽然输入数据被均分到不同gpu上,但每个gpu上都要拷贝一份模

    2024年02月12日
    浏览(45)
  • 大语言模型的分布式训练

    什么是大语言模型 训练方式 面临的挑战 什么是分布式计算 如何实现 拆分逻辑 分发逻辑 大语言模型的分布式训练 数据并行 模型并行 流水线并行 张量并行 通信 PS NCCL是Nvidia Collective multi-GPU Communication Library的简称,它是一个实现多GPU的collective communication通信(all-gather, red

    2024年02月10日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包