分布式并行训练(DP、DDP、DeepSpeed)

这篇具有很好参考价值的文章主要介绍了分布式并行训练(DP、DDP、DeepSpeed)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

[pytorch distributed] 01 nn.DataParallel 数据并行初步

  • 数据并行 vs. 模型并行
    • 数据并行:模型拷贝(per device),数据 split/chunk(对batch切分)

      • 每个device上都拷贝一份完整模型,每个device分别处理1个batch的一部分(如batch_size=64, 2个device, 每device处理32个样本)
      • 梯度反向传播时,每个设备上的梯度求和(求和才是一个完整batch所有样本的loss),汇入中心设备/参数服务器(默认gpu0)对模型进行梯度优化。
    • 模型并行:数据拷贝(per device),模型 split/chunk(显然是单卡放不下模型的情况下)

  • DP => DDP
    • DPnn.DataParallel (不推荐)
      • https://pytorch.org/docs/stable/generated/torch.nn.DataParallel.html
    • DDP: DistributedDataParallel (推荐)
    • Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel.

1. 数据并行DP(nn.DataParallel)

预先定义一下Dataset和Model

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class RandomDataset(Dataset):
    def __init__(self, size, length):
        self.len = length
        # 100*5
        self.data = torch.randn(length, size)
    def __getitem__(self, index):
        # (5, )
        return self.data[index]
    def __len__(self):
        # 100
        return self.len

class Model(nn.Module):
    # Our model
    def __init__(self, input_size, output_size):
        # 5 => 2
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)
    def forward(self, input):
        output = self.fc(input)
        print("\tIn Model: input size", input.size(),
              "output size", output.size())
        return output


input_size = 5  # 模型输入数据维度(b,n) = (30, 5)
output_size = 2  # 模型输出数据维度(b,n) = (30, 2)

batch_size = 30  # batch size
data_size = 100  # 数据集样本数量

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),
                         batch_size=batch_size, 
                         shuffle=True)
# 构造优化器和损失函数
optimizer = optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()

# 模拟目标值
target = torch.randn(64, 5) 

step1: 并行化包裹模型

# Parameters and DataLoaders                    
# (5, 2)
model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:  # 如果不止1张GPU 
	# 构建数据并行模型
	device_ids = [0, 1]  # 使用的设备ID列表
    # 如3张GPU,dim = 0,[30, xxx] -> [15, ...], [15, ...] on 2 GPUs
    model = nn.DataParallel(model, device_ids)  # 并行化,默认使用所有device加载数据
  • torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
    • model= 指传入的模型
    • device_ids=None,
      • 参与训练的 GPU 有哪些,device_ids=gpus,默认None是使用全部device;
    • output_device=None
      • 指定中心设备(参数服务器),用于汇总梯度的 GPU 是哪个,output_device=gpus[0]
    • dim=0
      • 从那一维度进行数据切分,默认batch维度
  • 在执行 forward/backward 之前,使用 DataParallel 将 model 复制到 device_ids 指定设备上,进行数据并行处理
    • model.to('cuda:0')
    • 不同的是tensor的to(device)是在device上生成一个拷贝,不改变原来cpu上的tensor;而model是直接将原model转移到gpu上。

step2:加载到device0

设置中心设备(参数服务器),用于反向传播时的梯度汇总,一般指定cuda:0

# 将模型从cpu放在gpu 0上 
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') 
model.to(device)

step3:forward前向传播

模型forward时,将data_loader加载的一个batch的数据进行切分,送入不同device的模型进行计算,再将结果合并输出。

for data in rand_loader:
    # input_var can be on any device, including CPU
    input = data.to(device)
#     input = data
    output = model(input)
    print("Outside: input size", input.size(),
          "output_size", output.size())
"""
	In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
	In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])
	Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])
"""

step4:反向传播梯度聚合

loss.backward()分别在每个device上计算loss的梯度,average_gradients(model)将梯度聚合到中心设备/参数服务器(cuda:0)上,进行梯度优化

	# 在每个设备上进行前向传播和梯度计算
	loss = criterion(output, target)
	loss.backward()
	
	# 对各个设备上的梯度进行求和
	average_gradients(model)
	
	# 使用原始设备模型进行梯度优化
	optimizer.step()

2. 分布式数据并行DDP(nn.parallel.DistributedDataParallel)

multiple GPUs in a single machine/server/node:单机多卡

  • 分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas)
    • DDP 始终在卡间维持着模型参数和优化器状态的同步一致性在整个训练过程中;
  • Data Parallel,一个batch的数据通过 DistributedSampler 切分split 分发到不同的 gpus 上
    • 此时虽然模型/optimizer 相同,但因为每个device的数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同
    • 此时 ddp 通过 ring all-reduce algorithm ,保证每个batch step结束后不同卡间model/optimizer 的同步一致性

分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

  • 如上图所示,Ring all-reduce algorithm
    • 首先会将所有的 gpu cards 连成一个 ring环
    • 其同步过程,不需要等待所有的卡都计算完一轮梯度,
    • 经过这个同步过程之后,所有的卡的 models/optimizers 就都会保持一致的状态;

分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

  • Ring all-reduce algorithm 计算和同步的几个过程
    • 红线:GPUs 分别计算损失(forward)和梯度(backward)
    • 蓝线:梯度的聚合到中心device/参数服务器上(gpu0)
    • 绿线:(模型/优化器)参数的更新及广播(broadcast);

其实参数服务器可以是一个GPU0,也可以是CPU,也可以是所有GPU:
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式
但将数据发送到GPU0会成为device通信的瓶颈:
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

所以采用环形的梯度聚合方式更加高效:
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

2.1 DDP基本概念

  • world

    • world 表示包含所有进程的组(所有gpu的集合)。
    • 每个进程通常对应一个 GPU, world 中的进程可以相互通信,这使得使用分布式数据并行(Distributed Data Parallel, DDP)进行训练成为可能。
  • world_size(gpu个数/进程个数):

    • world_size 表示分布式训练环境中的总进程数/gpu数。
    • 每个进程都会被分配一个唯一的标识符(rank),从 0 到 world_size-1。
  • rank(进程标识符):

    • rank 是分配给world每个进程的唯一标识符,用于标识每个进程在分布式训练中的角色。
    • local rank是分配个单个node每个进程的标识符,world中可能有多个node。
  • node(节点):

    • node 可以理解为一个服务器,代表着物理设备上的一个实体。
    • 在多机分布式训练中,每台机器被视为一个节点,节点之间需要进行通信。
    • 例如,如果有2 个node/server,每个 node/server/machine 各有4张卡(4 gpus)。total_world_size = 2(节点数) * 4(每个节点的 GPU 数量)= 8, rank 的取值范围为 [0, 1, 2, 3, 4, 5, 6, 7], local_rank 的取值范围为 [0, 1, 2, 3],[0, 1, 2, 3] 分别对应着不同的节点上的进程。
  • All to one:聚合过程(reduce),所有GPU(model和optiminizer状态)汇聚到参数服务器;

  • one to All:广播过程(broadcast),参数服务器广播到所有GPU;

2.2 不用torchrun

按照下面的脚本:

python DDP_script_no_torchrun.py
step1:导入相关的包
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler  # 分发数据
from torch.nn.parallel import DistributedDataParallel as DDP  # 包装model使之数据并行
from torch.distributed import init_process_group, destroy_process_group
step2:ddp_setup函数

在不用trochrun时,这个函数用于设置分布式训练的环境(对rank号GPU进行初始化)。它调用了init_process_group函数来初始化进程组,在分布式情况下,使用backend(后端)是nccl完成GPU间通信(NVIDIA Collective Communication Library),然后使用torch.cuda.set_device函数,根据环境变量设置当前进程使用的GPU设备。

def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    # rank 0 process
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    # nccl:NVIDIA Collective Communication Library 
    # 分布式情况下的,gpus 间通信
    init_process_group(backend="nccl", rank=rank, world_size=world_size)
    torch.cuda.set_device(rank)
step3:Trainer类

这个类定义了一个模型训练的封装器。在初始化方法中,它接收一个模型backend、一个训练数据加载器train_dataloader、一个优化器train_dataloader作为参数,并将模型移动到GPU上,然后使用DistributedDataParallel对模型进行包装,以实现数据并行。(model先放cuda再DDP封装)

分布式体现在哪?原本单GPU执行 每个epoch下,Len(data_loader)=2048Batchsize=32,一个GPU计算的数据量 Steps = Len(data_loader) / Batchsize,即2048/32=64个;而使用8个GPU时,每个GPU计算的数据量Steps = Len(data_loader) / Batchsize / world_size,即2048/32/8=64/8=8个,单个GPU吞吐的数据变少,BatchSize就可以调大了。

_run_batch方法实现了一次批量的训练过程,包括前向传播、计算损失、反向传播和更新参数。_run_epoch方法用于遍历整个训练集进行训练,self.train_dataloader.sampler.set_epoch(epoch)是用于设置数据加载器的epoch,以保证每个GPU在每个epoch开始时加载的数据都是不同的。train方法则用于控制训练的总体流程。

class Trainer:
    def __init__(self, 
                 model: torch.nn.Module, 
                 train_dataloader: DataLoader, 
                 optimizer: torch.optim.Optimizer, 
                 gpu_id: int) -> None:
        self.gpu_id = gpu_id  # rank
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer      
        # 对模型进行wrap 
        self.model = model.to(self.gpu_id) 
        self.model = DDP(model, device_ids=[self.gpu_id])  # 每张卡都会维护一个model
        
    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()
    
    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        # 打印在哪个GPU上跑的哪个epoch (Steps = Len(data_loader) / Batchsize / world_size)
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
        # 每个epoch对数据划分的方式不同
        self.train_dataloader.sampler.set_epoch(epoch)
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)
    
    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)
step4:MyTrainDataset类

这个类定义了一个自定义的训练数据集。在初始化方法中,它接收一个大小参数,并生成一组随机的数据样本。__len__方法返回数据集的大小,__getitem__方法用于获取指定索引处的数据样本。

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]
step5:main函数

这个函数是程序的主函数。在函数内部,首先调用了ddp_setup函数来设置分布式训练的环境。

然后创建了一个自定义的训练数据集和相应的数据加载器,以及一个线性模型和一个优化器。DistributedSampler是PyTorch提供的一个分布式采样器,用于确保每个进程加载的数据都是不同的且顺序随机。sampler对象被传入训练数据集的构造函数,可以通过数据加载器(如torch.utils.data.DataLoader)的sampler参数指定。在每个进程中,DistributedSampler会根据进程ID和进程数量,将整个训练数据集划分成多个部分,并为每个进程提供其应加载的数据索引。这样,在分布式训练过程中,每个进程只会加载自己负责的数据部分,避免了数据重复加载。

接下来,创建了一个Trainer对象,并调用其train方法进行模型训练。最后调用destroy_process_group函数销毁进程组。

def main(rank: int, world_size: int, max_epochs: int, batch_size: int):
    # register ddp
    ddp_setup(rank, world_size)
    
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, 
                              batch_size=batch_size, 
                              pin_memory=True, 
                              shuffle=False, 
                              # batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)
                              sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
    trainer = Trainer(model=model, optimizer=optimzer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)
    
    destroy_process_group()
step6:解析命令行参数并运行主函数

在这个步骤中,首先使用argparse模块解析命令行参数,包括最大训练周期数max_epochs和批量大小batch_size。然后调用main函数,并将解析后的参数传递给它进行模型训练。

if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('--max_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()
    
	world_size = torch.cuda.device_count()
    # 启动 world_size 个进程(每个进程对应一个GPU)执行train函数
    mp.spawn(main, args=(world_size, args.max_epochs, args.batch_size), nprocs=world_size)
    # mp.spawn传入函数名train
    # train()的第一个参数 rank 由 mp 的 id 自动指定
    # train()的其他个参数 由args传入
    # nprocs=world_size指定进程数量

2.3 用torchrun

torchrun运行分布式train.py脚本。

torchrun --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32
  • torchrun的参数:nproc-per-node设置每个node服务器上的gpu个数(一般是1个服务器下的gpu个数)
  • python脚本的参数:ddp_gpus_torchrun.py脚本名称,--max_epochs 5 --batch_size 32脚本参数。

实现batch_size不变的情况下,对step的切分
(如单卡情况下,data_len=1024,batch_size=32,则一个gpu的step=1024/32=32
(多卡情况下2个gpu,data_len=1024,batch_size=32,则每个gpu的step=(1024/32)/2=32/2=16

使用torchrun的情况下,不需要指定rankworld_size,把rank替换为int(os.environ['LOCAL_RANK'])即可:

import os, sys
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group

def ddp_setup():
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    # rank 0 process
#     os.environ["MASTER_ADDR"] = "localhost"
#     os.environ["MASTER_PORT"] = "12355"
    # nccl:NVIDIA Collective Communication Library 
    # 分布式情况下的,gpus 间通信
    init_process_group(backend="nccl")
    torch.cuda.set_device(int(os.environ['LOCAL_RANK']))
    
class Trainer:
    def __init__(self, 
                 model: torch.nn.Module, 
                 train_dataloader: DataLoader, 
                 optimizer: torch.optim.Optimizer, 
                 ) -> None:
        self.gpu_id = int(os.environ['LOCAL_RANK'])
        self.model = model.to(self.gpu_id)
        self.train_dataloader = train_dataloader
        self.optimizer = optimizer
        self.model = DDP(model, device_ids=[self.gpu_id])
        

    
    def _run_batch(self, xs, ys):
        self.optimizer.zero_grad()
        output = self.model(xs)
        loss = F.cross_entropy(output, ys)
        loss.backward()
        self.optimizer.step()
    
    def _run_epoch(self, epoch):
        batch_size = len(next(iter(self.train_dataloader))[0])
        print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')
        self.train_dataloader.sampler.set_epoch(epoch)
        for xs, ys in self.train_dataloader:
            xs = xs.to(self.gpu_id)
            ys = ys.to(self.gpu_id)
            self._run_batch(xs, ys)
    
    def train(self, max_epoch: int):
        for epoch in range(max_epoch):
            self._run_epoch(epoch)

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

def main(max_epochs: int, batch_size: int):
    ddp_setup()
    
    train_dataset = MyTrainDataset(2048)
    train_dataloader = DataLoader(train_dataset, 
                              batch_size=batch_size, 
                              pin_memory=True, 
                              shuffle=False, 
                              # batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)
                              sampler=DistributedSampler(train_dataset))
    model = torch.nn.Linear(20, 1)
    optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
    trainer = Trainer(model=model, optimizer=optimzer, train_dataloader=train_dataloader)
    trainer.train(max_epochs)
    
    destroy_process_group()

    
if __name__ == '__main__':
    
    import argparse
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('--max_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()
    
#    world_size = torch.cuda.device_count()
    main(args.max_epochs, args.batch_size)

3. 模型并行

  • 数据并行是切数据(scattering inputs and gathering outputs),模型并行是切模型(shards);
    • 模型并行单卡放不下一份模型;
    • 将一份大模型,不同的层切分到不同的卡上,forward时串行执行;

3.1 Huggingface实现

  • device_mapHuggingface支持自动实现模型并行
    • device_map参数的取值["auto", "balanced", "balanced_low_0", "sequential"]
    • auto的模型分割优先级:GPU(s) > CPU (RAM) > Disk

如下,如果有两个gpu,device_map="auto"使模型的2个layers的parameter分别加载到两张gpu上(各一半):

from transformers import LlamaTokenizer, LlamaForCausalLM, GenerationConfig
model = LlamaForCausalLM.from_pretrained("decapoda-research/llama-7b-hf",
    load_in_8bit=True,
    device_map="auto",
)
for i, para in enumerate(model.named_parameters()):
#     print(f'{i}, {para[0]}\t {para[1].device} \t{para[1].dtype}')
    print(f'{i}, \t {para[1].device} \t{para[1].dtype}')`

3.2 to(device)手动实现

模型并行,卡间串行,时间换空间。

pytorch模拟Huggingface的模型并行原理:分别用to(device),将不同的layers加载到不同的gpu上,forward时将data也加载到对应gpu!!(weight*data之前需要保证两个tensor在相同的device)。

import torch
import torch.nn as nn
import torch.optim as optim

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = torch.nn.Linear(10000, 10).to('cuda:0')
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to('cuda:1')

    def forward(self, x):
        # 卡间串行执行
        x = self.net1(x.to('cuda:0')))
        x = self.net2(self.relu(x.to('cuda:1'))
        return x

实例化模型,将其参数加载到2个GPU之后:

model = ToyModel()

分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

进行一个batch的train:每个batch_size=20样本,5分类。labelpred计算loss之前也要统一device!

loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()
outputs = model(torch.randn(20, 10000))
labels = torch.randn(20, 5).to('cuda:1')
loss_fn(outputs, labels).backward()
optimizer.step()

3.3 accelerate实现

accelerate需要将准备四种主要类型的对象:models (torch.nn.Module)、optimizers (torch.optim.Optimizer)、dataloaders (torch.data.dataloader.DataLoader)、scheduler(可选),一起传递给 prepare()方法。

Dataloader必须是torch.data.dataloader.DataLoader,否则会每张卡都把整个数据集全部加载一遍,就失去ddp的意义了。 ?还是说Accelerate设置batchsize=1时,每张卡都把整个数据集全部加载一遍,就失去ddp的意义了。

大佬解答一下吧

看这3篇文章:

  • https://zhuanlan.zhihu.com/p/605640431
  • https://zhuanlan.zhihu.com/p/606061177
  • https://zhuanlan.zhihu.com/p/462453622
  • https://zhuanlan.zhihu.com/p/668646528

4. Deepspeed

DeepSpeed:炼丹小白居家旅行必备【神器】
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

技术栈
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

术语:其实和前面DDP的概念一样。

分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

Train的数据4部分组成:model模型参数backward的梯度gradientoptimizer优化器参数forward的数据tensor
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

Deepspeed、ZeRO技术方案分发Partitioning(按gpu数量N等分数据)、卸载Offload(不用的数据放入CPU)、模型并行Pipeline(模型参数按层切分到不同gpu上)
分布式并行训练(DP、DDP、DeepSpeed),Pytorch学习,分布式

step1:deepspeed初始化

# init distributed
deepspeed.init_distributed()

加载参数local_rank

def parse_arguments():
    import argparse
    parser = argparse.ArgumentParser(description='deepspeed training script.')
    parser.add_argument('--local_rank', type=int, default=-1,
                        help='local rank passed from distributed launcher')
    # Include DeepSpeed configuration arguments
    parser = deepspeed.add_config_arguments(parser)
    args = parser.parse_args()
    return args

step2:deepspeed封装模型和数据集

deepspeed.initialize()封装model和dataset,相当于将模型和数据集交给deepspeed进行托管,engine就是deepspeed封装后的model,其他返回值同样都是deepspeed封装过的。(其中optimizer和lr_scheduler 后面是用不到的),我们只需要模型engine数据加载器training_dataloader

还要传入一个deepspeed的配置文件deepspeed_config

# init model
model = MyClassifier(3, 100, ch_multi=128)
# init dataset
ds = MyDataset((3, 512, 512), 100, sample_count=int(1e6))

# init engine
engine, optimizer, training_dataloader, lr_scheduler = deepspeed.initialize(
    args=args,
    model=model,
    model_parameters=model.parameters(),
    training_data=ds,
    config=deepspeed_config,
)
# load checkpoint
engine.load_checkpoint("./data/checkpoints/MyClassifier/")

step3:训练

在使用DeepSpeed进行分布式训练时,通常不需要手动调用optimizer.zero_grad()来清零梯度。DeepSpeed会自动处理梯度累积和梯度清零的操作,无需手动调用zero_grad()。

当使用DeepSpeed进行分布式训练时,一般会在engine.backward(loss)之后调用engine.step()来执行梯度更新操作。在engine.step()中,DeepSpeed会执行优化器的step()方法来更新模型参数,并在必要的时候自动清零梯度,以便进行下一轮的反向传播。

engine.train()
    for step, (data, label) in enumerate(training_dataloader):
        step += 1
        data= data.to(device=engine.device, dtype=torch.float16)  # x
        label = label.to(device=engine.device, dtype=torch.long).reshape(-1)  # y
		
		# 不需要梯度清零optimizer.zero_grad()
        outputs = engine(data)  # forward
        loss = F.cross_entropy(outputs, label )
        engine.backward(loss)
        engine.step()

单机节点node多卡gpu运行

deepspeed \
    --launcher_args "source ${PWD}/setup_env.sh" \
    --hostfile hostfile \
    deepspeed_script.py \
    --deepspeed \
    --deepspeed_config "$PWD/deepspeed_config.json"

deepspeed_config.json文章来源地址https://www.toymoban.com/news/detail-729516.html

{
    "train_micro_batch_size_per_gpu": 1,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "Adam",
        "params": {
            "lr": 0.001,
            "betas": [
                0.8,
                0.999
            ],
            "eps": 1e-08,
            "weight_decay": 3e-07
        }
    },
    "scheduler": {
        "type": "WarmupLR",
        "params": {
            "warmup_min_lr": 0,
            "warmup_max_lr": 0.001,
            "warmup_num_steps": 1000
        }
    },
    "activation_checkpointing": {
        "partition_activations": true,
        "cpu_checkpointing": true,
        "contiguous_memory_optimization": false,
        "number_checkpoints": null,
        "synchronize_checkpoint_boundary": false,
        "profile": true
    },
    "fp16": {
        "enabled": true,
        "auto_cast": false,
        "loss_scale": 0,
        "initial_scale_power": 16,
        "loss_scale_window": 1000,
        "hysteresis": 2,
        "consecutive_hysteresis": false,
        "min_loss_scale": 1
    },
    "zero_optimization": {
        "stage": 3,
        "offload_param": {
            "device": "cpu",
            "pin_memory": true
        },
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": true
        },
        "contiguous_gradients": true,
        "overlap_comm": true
    }
}

到了这里,关于分布式并行训练(DP、DDP、DeepSpeed)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【深入了解PyTorch】PyTorch分布式训练:多GPU、数据并行与模型并行

    在深度学习领域,模型的复杂性和数据集的巨大规模使得训练过程变得极具挑战性。为了加速训练过程,利用多个GPU进行并行计算是一种常见的方法。PyTorch作为一种流行的深度学习框架,提供了强大的分布式训练工具,使得多GPU、数据并行和模型并行等技术变得更加容易实现

    2024年02月12日
    浏览(29)
  • torch分布式数据并行:torch.nn.parallel.DistributedDataParallel(DDP),代码书写步骤

    多进程做多卡训练; 目录 1 初始化进程组: 2 当前进程所能用到的GPU卡的名称 3 将数据集随机分配到不同的GPU上 4 将train_sampler传入DataLoader中 5 将数据进行拷贝 6 模型放到GPU上 7 执行命令 8 模型保存 9 加载模型 10 注意事项 代码编写流程: 1 初始化进程组: ‘nccl’ 指定GPU之

    2024年02月15日
    浏览(28)
  • PyTorch数据并行(DP/DDP)浅析

    一直以来都是用的单机单卡训练模型,虽然很多情况下已经足够了,但总有一些情况得上分布式训练: 模型大到一张卡放不下; 单张卡batch size不敢设太大,训练速度慢; 当你有好几张卡,不想浪费; 展示一下技术 。 由于还没遇到过一张显卡放不下整个模型的情况,本文的

    2024年02月02日
    浏览(24)
  • 用通俗易懂的方式讲解大模型分布式训练并行技术:MOE并行

    前面的文章中讲述了数据并行、流水线并行、张量并行、序列并行、自动并行等多种并行技术。但现在的模型越来越大,训练样本越来越多,每个样本都需要经过模型的全部计算,这就导致了训练成本的平方级增长。 而当我们希望在牺牲极少的计算效率的情况下,把模型规模

    2024年02月02日
    浏览(53)
  • mmdetection使用指定的显卡号并行分布式训练

    后面的不用看了,直接看最省事版本:                 直接用CUDA_VISIBLE_DEVICES=\\\"2,3\\\"指定多卡就可以,也可以给sh文件传参进去。但是,切记!切记!切记! sh文件里不能有空行,尤其是使用反斜杠 连接多行的时候 ,我一开始尝试指定多卡不起作用,就是因为图美观手

    2024年02月08日
    浏览(25)
  • Pytorch实现多GPU并行训练(DDP)

    Pytorch实现并行训练通常有两个接口: DP(DataParallel) 和 DDP(DistributedDataParallel) 。目前 DP(DataParallel) 已经被Pytorch官方deprecate掉了,原因有二:1, DP(DataParallel) 只支持单机多卡,无法支持多机多卡;2, DP(DataParallel) 即便在单机多卡模式下效率也不及 DDP(Distributed

    2024年02月11日
    浏览(80)
  • Pytorch 多卡并行(3)—— 使用 DDP 加速 minGPT 训练

    前文 并行原理简介和 DDP 并行实践 和 使用 torchrun 进行容错处理 在简单的随机数据上演示了使用 DDP 并行加速训练的方法,本文考虑一个更加复杂的 GPT 类模型,说明如何进行 DDP 并行实战 MinGPT 是 GPT 模型的一个流行的开源 PyTorch 复现项目,其实现简洁干净可解释,因而颇具

    2024年02月09日
    浏览(32)
  • 【分布式训练】基于PyTorch进行多GPU分布式模型训练(补充)

    简介: 在PyTorch中使用DistributedDataParallel进行多GPU分布式模型训练。 原文链接:https://towardsdatascience.com/distributed-model-training-in-pytorch-using-distributeddataparallel-d3d3864dc2a7 随着以ChatGPT为代表的大模型的不断涌现,如何在合理的时间内训练大模型逐渐成为一个重要的研究课题。为了解

    2024年02月16日
    浏览(36)
  • pytorch 分布式训练

    分布式训练分为这几类: 按照并行方式来分:模型并行 vs 数据并行 按照更新方式来分:同步更新 vs 异步更新 按照算法来分:Parameter Server算法 vs AllReduce算法 这个函数主要有三个参数: module:即模型,此处注意,虽然输入数据被均分到不同gpu上,但每个gpu上都要拷贝一份模

    2024年02月12日
    浏览(32)
  • TensorFlow、PyTorch分布式训练

    要在两台主机之间使用分布式训练,您可以使用一些深度学习框架提供的工具和库来实现。 这里以TensorFlow为例,介绍一下如何在两台主机之间使用分布式训练。 首先,您需要安装TensorFlow和CUDA等相关软件,并确保两台主机都可以访问彼此。然后,您需要在代码中使用TensorF

    2024年02月07日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包