torch分布式通信基础

这篇具有很好参考价值的文章主要介绍了torch分布式通信基础。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


官网文档:WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

1. 点到点通信

torch分布式通信基础,分布式训练,分布式,torch

# 同步,peer-2-peer数据传递
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_send_recv_sync(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        dist.send(tensor=tensor, dst=1) # 需要指定dst,发送的目标
    else:
        dist.recv(tensor=tensor, src=0) # 需要指定src,从哪儿接收
    print('Rank ', rank, ' has data ', tensor[0])

# 异步
def test_send_recv_async(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        req = dist.isend(tensor=tensor, dst=1)
    else:
        req = dist.irecv(tensor=tensor, src=0)
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29598'
    dist.init_process_group(backend, rank=rank, world_size=size)
    #test_send_recv_sync(rank, size)
    test_send_recv_async(rank, size)

if __name__ == "__main__":
    size = 2
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

2. 集群通信

torch分布式通信基础,分布式训练,分布式,torch
torch分布式通信基础,分布式训练,分布式,torch
torch分布式通信基础,分布式训练,分布式,torch

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def test_broadcast(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor += 2
    else:
      tensor += 1
    dist.broadcast(tensor=tensor,src=0) # src指定broad_cast的源
    print("******test_broadcast******")
    print('Rank ', rank, ' has data ', tensor) # 结果都是 2

def test_scatter(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
      tensor_list = [torch.tensor([1.0]), torch.tensor([2.0]), torch.tensor([3.0]), torch.tensor([4.0])]
      dist.scatter(tensor, scatter_list = tensor_list, src = 0)
    else:
      dist.scatter(tensor, scatter_list = [], src = 0)
    print("******test_scatter******")
    print('Rank ', rank, ' has data ', tensor) # 结果是[[1], [2], [3], [4]]

def test_reduce(rank, size):
    tensor = torch.ones(1)
    dist.reduce(tensor=tensor, dst=0) # dst指定哪个进程进行reduce, 默认操作是加法
    print("******test_reduce******")
    print('Rank ', rank, ' has data ', tensor)

def test_all_reduce(rank, size):
    tensor = torch.ones(1)
    dist.all_reduce(tensor=tensor,op=dist.ReduceOp.SUM)
    print("******test_all_reduce******")
    print('Rank ', rank, ' has data ', tensor)  # 结果都是 4

def test_gather(rank, size):
    tensor = torch.ones(1)
    if rank == 0:
      output = [torch.zeros(1) for _ in range(size)]
      dist.gather(tensor, gather_list=output, dst=0)
    else:
      dist.gather(tensor, gather_list=[], dst=0)
    if rank == 0:
      print("******test_gather******")
      print('Rank ', rank, ' has data ', output)  # 结果是 [[1,1,1,1]]

def test_all_gather(rank, size):
    output = [torch.zeros(1) for _ in range(size)]
    tensor = torch.ones(1)
    dist.all_gather(output, tensor)
    print("******test_all_gather******")
    print('Rank ', rank, ' has data ', output)  # 结果都是 [1,1,1,1]

def init_process(rank, size, backend='gloo'):
    """ 这里初始化分布式环境,设定Master机器以及端口号 """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29596'
    dist.init_process_group(backend, rank=rank, world_size=size)
    test_reduce(rank, size)
    test_all_reduce(rank, size)
    test_gather(rank, size)
    test_all_gather(rank, size)
    test_broadcast(rank, size)
    test_scatter(rank, size)

if __name__ == "__main__":
    size = 4
    processes = []
    mp.set_start_method("spawn")
    for rank in range(size):
        p = mp.Process(target=init_process, args=(rank, size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

需要注意的一点是:
这里面的调用都是同步的,可以理解为,每个进程都调用到通信api时,真正的有效数据传输才开始,然后通信完成之后,代码继续往下跑。实际上有些通信进程并不获取数据,这些进程可能并不会被阻塞。

文档最后,提供了一个简单的类似 DDP 的实现,里面核心的部分就是:
torch分布式通信基础,分布式训练,分布式,torch
torch分布式通信基础,分布式训练,分布式,torch
这也进一步阐释了DDP的核心逻辑:
反向计算完成之后,汇总梯度信息(求均值),然后再更新参数文章来源地址https://www.toymoban.com/news/detail-568986.html

到了这里,关于torch分布式通信基础的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • torch分布式数据并行:torch.nn.parallel.DistributedDataParallel(DDP),代码书写步骤

    多进程做多卡训练; 目录 1 初始化进程组: 2 当前进程所能用到的GPU卡的名称 3 将数据集随机分配到不同的GPU上 4 将train_sampler传入DataLoader中 5 将数据进行拷贝 6 模型放到GPU上 7 执行命令 8 模型保存 9 加载模型 10 注意事项 代码编写流程: 1 初始化进程组: ‘nccl’ 指定GPU之

    2024年02月15日
    浏览(28)
  • 【分布式训练】基于PyTorch进行多GPU分布式模型训练(补充)

    简介: 在PyTorch中使用DistributedDataParallel进行多GPU分布式模型训练。 原文链接:https://towardsdatascience.com/distributed-model-training-in-pytorch-using-distributeddataparallel-d3d3864dc2a7 随着以ChatGPT为代表的大模型的不断涌现,如何在合理的时间内训练大模型逐渐成为一个重要的研究课题。为了解

    2024年02月16日
    浏览(36)
  • 【分布式】大模型分布式训练入门与实践 - 04

    【分布式】NCCL部署与测试 - 01 【分布式】入门级NCCL多机并行实践 - 02 【分布式】小白看Ring算法 - 03 【分布式】大模型分布式训练入门与实践 - 04 数据并行(Distributed Data Parallel)是一种用于加快深度学习模型训练速度的技术。在过去,训练大型模型往往受限于单卡训练的瓶颈

    2024年02月08日
    浏览(30)
  • pytorch 进行分布式调试debug torch.distributed.launch 三种方式

    一. pytorch 分布式调试debug torch.distributed.launch 三种方式 1. 方式1:ipdb调试(建议) 参考之前的博客:python调试器 ipdb 注意:pytorch 分布式调试只能使用侵入式调试,也即是在你需要打断点的地方(或者在主程序的第一行)添加下面的代码: 当进入pdb调试后,跟原先使用pdb调试

    2024年02月07日
    浏览(25)
  • pytorch 分布式训练

    分布式训练分为这几类: 按照并行方式来分:模型并行 vs 数据并行 按照更新方式来分:同步更新 vs 异步更新 按照算法来分:Parameter Server算法 vs AllReduce算法 这个函数主要有三个参数: module:即模型,此处注意,虽然输入数据被均分到不同gpu上,但每个gpu上都要拷贝一份模

    2024年02月12日
    浏览(32)
  • TensorFlow、PyTorch分布式训练

    要在两台主机之间使用分布式训练,您可以使用一些深度学习框架提供的工具和库来实现。 这里以TensorFlow为例,介绍一下如何在两台主机之间使用分布式训练。 首先,您需要安装TensorFlow和CUDA等相关软件,并确保两台主机都可以访问彼此。然后,您需要在代码中使用TensorF

    2024年02月07日
    浏览(34)
  • 大语言模型的分布式训练

    什么是大语言模型 训练方式 面临的挑战 什么是分布式计算 如何实现 拆分逻辑 分发逻辑 大语言模型的分布式训练 数据并行 模型并行 流水线并行 张量并行 通信 PS NCCL是Nvidia Collective multi-GPU Communication Library的简称,它是一个实现多GPU的collective communication通信(all-gather, red

    2024年02月10日
    浏览(35)
  • 大模型学习笔记08——分布式训练

    模型规模的扩大,对硬件(算力、内存)的发展提出要求。然而,因为内存墙的存在,单一设备的算力及容量,受限于物理定律,持续提高芯片的集成越来越困难,难以跟上模型扩大的需求。 为了解决算力增速不足的问题,人们考虑用多节点集群进行分布式训练,以提升算力

    2024年01月23日
    浏览(33)
  • ROS:分布式通信

    ROS是一个分布式计算环境。一个运行中的ROS系统可以包含分布在多台计算机上多个节点。根据系统的配置方式,任何节点可能随时需要与任何其他节点进行通信。 因此,ROS对网络配置有某些要求: 所有端口上的所有机器之间必须有完整的双向连接。 每台计算机必须通过所有

    2024年02月12日
    浏览(28)
  • 分布式通信方式

      分布式通信是指在分布式系统中,不同节点之间进行消息传递和交互的方式。   以下是常见的分布式通信方式:   使用消息队列作为中间件,节点之间通过发送和接收消息来实现通信。消息队列提供了异步、解耦和可靠性的通信机制,常见的消息队列系统包括Rabb

    2024年02月15日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包