在pytorch的DDP原生代码使用的基础上,ray和accelerate两个库对于pytorch并行训练的代码使用做了更加友好的封装。
以下为极简的代码示例。
ray
ray.py文章来源:https://www.toymoban.com/news/detail-676265.html
#coding=utf-8
import os
import sys
import time
import numpy as np
import torch
from torch import nn
import torch.utils.data as Data
import ray
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
import onnxruntime
# bellow code use AI model to simulate linear regression, formula is: y = x1 * w1 + x2 * w2 + b
# --- DDP RAY --- #
# model structure
class LinearNet(nn.Module):
def __init__(self, n_feature):
super(LinearNet, self).__init__()
self.linear = nn.Linear(n_feature, 1)
def forward(self, x):
y = self.linear(x)
return y
# whole train task
def train_task():
print("--- train_task, pid: ", os.getpid())
# device setting
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:", device)
device_ids = torch._utils._get_all_device_indices()
print("device_ids:", device_ids)
if len(device_ids) <= 0:
print("invalid device_ids, exit")
return
# prepare data
num_inputs = 2
num_examples = 1000
true_w = [2, -3.5]
true_b = 3.7
features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)
# load data
batch_size = 10
dataset = Data.TensorDataset(features, labels)
data_iter = Data.DataLoader(dataset, batch_size, shuffle=True)
for X, y in data_iter:
print(X, y)
break
data_iter = ray.train.torch.prepare_data_loader(data_iter)
# model define and init
model = LinearNet(num_inputs)
ddp_model = ray.train.torch.prepare_model(model)
print(ddp_model)
# cost function
loss = nn.MSELoss()
# optimizer
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.03)
# train
num_epochs = 6
for epoch in range(1, num_epochs + 1):
batch_count = 0
sum_loss = 0.0
for X, y in data_iter:
output = ddp_model(X)
l = loss(output, y.view(-1, 1))
optimizer.zero_grad()
l.backward()
optimizer.step()
batch_count += 1
sum_loss += l.item()
print('epoch %d, avg_loss: %f' % (epoch, sum_loss / batch_count))
# save model
print("save model, pid: ", os.getpid())
torch.save(ddp_model.module.state_dict(), "ddp_ray_model.pt")
def ray_launch_task():
num_workers = 2
scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=True)
trainer = TorchTrainer(train_loop_per_worker=train_task, scaling_config=scaling_config)
results = trainer.fit()
def predict_task():
print("--- predict_task")
# prepare data
num_inputs = 2
num_examples = 20
true_w = [2, -3.5]
true_b = 3.7
features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)
model = LinearNet(num_inputs)
model.load_state_dict(torch.load("ddp_ray_model.pt"))
model.eval()
x, y = features[6], labels[6]
pred_y = model(x)
print("x:", x)
print("y:", y)
print("pred_y:", y)
if __name__ == "__main__":
print("==== task begin ====")
print("python version:", sys.version)
print("torch version:", torch.__version__)
print("model name:", LinearNet.__name__)
ray_launch_task()
# predict_task()
print("==== task end ====")
accelerate
acc.py文章来源地址https://www.toymoban.com/news/detail-676265.html
#coding=utf-8
import os
import sys
import time
import numpy as np
from accelerate import Accelerator
import torch
from torch import nn
import torch.utils.data as Data
import onnxruntime
# bellow code use AI model to simulate linear regression, formula is: y = x1 * w1 + x2 * w2 + b
# --- accelerate --- #
# model structure
class LinearNet(nn.Module):
def __init__(self, n_feature):
super(LinearNet, self).__init__()
self.linear = nn.Linear(n_feature, 1)
def forward(self, x):
y = self.linear(x)
return y
# whole train task
def train_task():
print("--- train_task, pid: ", os.getpid())
# device setting
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:", device)
device_ids = torch._utils._get_all_device_indices()
print("device_ids:", device_ids)
if len(device_ids) <= 0:
print("invalid device_ids, exit")
return
# prepare data
num_inputs = 2
num_examples = 1000
true_w = [2, -3.5]
true_b = 3.7
features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)
# load data
batch_size = 10
dataset = Data.TensorDataset(features, labels)
data_iter = Data.DataLoader(dataset, batch_size, shuffle=True)
for X, y in data_iter:
print(X, y)
break
# model define and init
model = LinearNet(num_inputs)
# cost function
loss = nn.MSELoss()
# optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.03)
accelerator = Accelerator()
model, optimizer, data_iter = accelerator.prepare(model, optimizer, data_iter) # automatically move model and data to gpu as config
# train
num_epochs = 3
for epoch in range(1, num_epochs + 1):
batch_count = 0
sum_loss = 0.0
for X, y in data_iter:
output = model(X)
l = loss(output, y.view(-1, 1))
optimizer.zero_grad()
accelerator.backward(l)
optimizer.step()
batch_count += 1
sum_loss += l.item()
print('epoch %d, avg_loss: %f' % (epoch, sum_loss / batch_count))
# save model
torch.save(model, "acc_model.pt")
def predict_task():
print("--- predict_task")
# prepare data
num_inputs = 2
num_examples = 20
true_w = [2, -3.5]
true_b = 3.7
features = torch.tensor(np.random.normal(0, 1, (num_examples, num_inputs)), dtype=torch.float)
labels = true_w[0] * features[:, 0] + true_w[1] * features[:, 1] + true_b + torch.tensor(np.random.normal(0, 0.01, size=num_examples), dtype=torch.float)
model = torch.load("acc_model.pt")
model.eval()
x, y = features[6], labels[6]
pred_y = model(x)
print("x:", x)
print("y:", y)
print("pred_y:", y)
if __name__ == "__main__":
# launch method: use command line
# for example
# accelerate launch ACC.py
print("python version:", sys.version)
print("torch version:", torch.__version__)
print("model name:", LinearNet.__name__)
train_task()
predict_task()
print("==== task end ====")
到了这里,关于pytorch基于ray和accelerate实现多GPU数据并行的模型加速训练的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!