前言
最近一直在做类ChatGPT项目的部署 微调,关注比较多的是两个:一个LLaMA,一个ChatGLM,会发现有不少模型是基于这两个模型去做微调的,说到微调,那具体怎么微调呢,因此又详细了解了一下微调代码,发现微调LLM时一般都会用到Hugging face实现的Transformers库的Trainer类
从而发现,如果大家想从零复现ChatGPT,便得从实现Transformer开始,因此便开启了本文:从零实现Transformer的简易版与强大版:从300多行到3000多行,主要分为两个大部分
- 参考harvard对transformer的实现,按照Transformer每一步的原理逐步逐行从零实现,先编码器后解码器,特别是注意力机制(缩放点积、多头注意力)
- 从头到尾解读Hugging face实现的Transformers库的整体代码架构,及逐行解读每一行代码,而网上没有关于这个Transformers库的代码解读
且本文的代码解读与其他代码解读最大的不同是:会对出现在本文的每一行代码都加以注释、解释、说明,甚至对每行代码中的变量都会做解释/说明
总之,一如既往的保持对初学者的足够友好,让即便没有太多背景知识的也能顺畅理解本文
第一部分 从零实现Transformer编码器模块
transformer强大到什么程度呢,基本是17年之后绝大部分有影响力模型的基础架构都基于的transformer(比如,这里有200来个,包括且不限于基于decode的GPT、基于encode的BERT、基于encode-decode的T5等等)
通过博客内的这篇文章《Transformer通俗笔记:从Word2Vec、Seq2Seq逐步理解到GPT、BERT》,我们已经详细了解了transformer的原理(如果忘了,建议必复习下再看本文,当然,如果你实在不想跳转,就只想呆在本文,也行,我努力..)
如果把上图中的各种细节也显示出来,则如下大图所示(此大图来源于七月在线NLP11里倪老师讲的Transformer模型源码解读,positional encoding、多头等没画)
具体说来,是一个典型的编码器-解码器架构
# 定义一个基于 nn.Module 的编码器-解码器类
class EncoderDecoder(nn.Module):
# 初始化方法,接收编码器、解码器、源嵌入、目标嵌入和生成器作为参数
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
# 调用 nn.Module 的初始化方法
super(EncoderDecoder, self).__init__()
self.encoder = encoder # 将传入的编码器实例保存为类属性
self.decoder = decoder # 将传入的解码器实例保存为类属性
self.src_embed = src_embed # 将传入的源嵌入实例保存为类属性
self.tgt_embed = tgt_embed # 将传入的目标嵌入实例保存为类属性
self.generator = generator # 将传入的生成器实例保存为类属性
# 前向传播方法,接收源序列、目标序列和它们的掩码作为参数
def forward(self, src, tgt, src_mask, tgt_mask):
# 对源序列进行编码,并将编码结果与掩码传递给解码器进行解码
return self.decode(self.encode(src, src_mask), src_mask,
tgt, tgt_mask)
# 编码方法,接收源序列和掩码作为参数
def encode(self, src, src_mask):
# 将源序列进行嵌入,然后将嵌入后的序列和源序列掩码传给编码器
return self.encoder(self.src_embed(src), src_mask)
# 解码方法,接收编码器输出(memory)、源序列掩码、目标序列和目标序列掩码作为参数
def decode(self, memory, src_mask, tgt, tgt_mask):
# 将目标序列进行嵌入,然后将嵌入后的序列、编码器输出、源序列掩码和目标序列掩码传给解码器
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
# 定义一个基于 nn.Module 的生成器类
class Generator(nn.Module):
# 初始化方法,接收模型维度(d_model)和词汇表大小(vocab)作为参数
def __init__(self, d_model, vocab):
# 调用 nn.Module 的初始化方法
super(Generator, self).__init__()
# 定义一个线性层,将模型的输出维度映射到词汇表大小
self.proj = nn.Linear(d_model, vocab)
# 前向传播方法,接收输入 x
def forward(self, x):
# 将输入 x 传入线性层,然后对输出应用 log-softmax 激活函数(在最后一个维度上)
return F.log_softmax(self.proj(x), dim=-1)
考虑到Hugging face实现的Transformers库虽然功能强大,但3000多行,对于初次实现的初学者来说,理解难度比较大,因此,咱们一步步结合对应的原理来逐行编码实现一个简易版的transformer
1.1 关于输入的处理:针对输入做embedding,然后加上位置编码
为了方便后面代码的编写,先引入一些库
import numpy as np # 导入NumPy库,用于进行矩阵运算和数据处理
import torch # 导入PyTorch库,用于构建神经网络及相关操作
import torch.nn as nn # 导入PyTorch神经网络模块,用于构建神经网络层
import torch.nn.functional as F # 导入PyTorch神经网络函数库,用于激活函数、损失函数等
import math, copy, time # 导入数学库、复制库和时间库,用于各种数学计算、复制操作和计时
from torch.autograd import Variable # 从PyTorch自动微分库中导入Variable类,用于构建自动微分计算图
import matplotlib.pyplot as plt # 导入Matplotlib的pyplot模块,用于绘制图表和可视化
import seaborn # 导入Seaborn库,用于绘制统计图形和美化图表
seaborn.set_context(context="talk") # 设置Seaborn的上下文环境,设置图表的尺寸和标签字体大小等
%matplotlib inline # IPython魔术命令,使Matplotlib绘制的图形直接显示在Notebook内
1.1.1 针对输入做embedding
对于模型来说,每一句话比如“七月的服务真好,答疑的速度很快”,在模型中都是一个词向量,但如果每句话都临时抱佛脚去生成对应的词向量,则处理起来无疑会费时费力,所以在实际应用中,我们会事先预训练好各种embedding矩阵,这些embedding矩阵包含常用领域常用单词的向量化表示,且提前做好分词
维度1 | 维度2 | 维度3 | 维度4 | ... | 维度512 | |
教育 | ||||||
机构 | ||||||
在线 | ||||||
课程 | ||||||
.. | ||||||
服务 | ||||||
答疑 | ||||||
老师 |
从而当模型接收到“七月的服务真好,答疑的速度很快”这句输入时,便可以从对应的embedding矩阵里查找对应的词向量,最终把整句输入转换成对应的向量表示
这部分的代码 可以如下表示
# 定义一个名为Embeddings的类,继承自PyTorch的nn.Module类
class Embeddings(nn.Module):
# 初始化Embeddings类
def __init__(self, d_model, vocab):
# 调用父类nn.Module的初始化方法
super(Embeddings, self).__init__()
# 创建一个词嵌入层,参数为词汇表大小和词嵌入维度
self.lut = nn.Embedding(vocab, d_model)
# 将词嵌入维度保存为类属性
self.d_model = d_model
# 定义前向传播方法
def forward(self, x):
# 通过词嵌入层将输入的单词编码为向量,并乘以词嵌入维度的平方根进行缩放
return self.lut(x) * math.sqrt(self.d_model)
1.1.2 位置编码的实现
关于位置编码的通透理解,请参阅此文《一文通透位置编码:从标准位置编码到旋转位置编码RoPE》
最终,再通过下面这两行代码完美实现位置编码
# 使用正弦和余弦函数生成位置编码,对于d_model的偶数索引,使用正弦函数;对于奇数索引,使用余弦函数。
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
1.2 经过「embedding + 位置编码」后乘以三个权重矩阵得到三个向量Q K V
从下图可知,经过「embedding + 位置编码」得到的输入,会乘以「三个权重矩阵: 」得到查询向量Q、键向量K、值向量V(你可以简单粗暴的理解为弄出来了三个分身)
举个例子,针对「我想吃酸菜鱼」这句话,经过embedding + 位置编码后,可得(注:可以512维,也可以是768维,但由于transformer论文中作者设置的512维,所以除了这个酸菜鱼的例子暂为768维外,其他地方均统一为512维)
然后乘以三个权重矩阵得
为此,我们可以先创建4个相同的线性层,每个线性层都具有 d_model 的输入维度和 d_model 的输出维度
self.linears = clones(nn.Linear(d_model, d_model), 4)
前三个线性层分别用于对 Q向量、K向量、V向量进行线性变换(至于这第4个线性层在随后的第3点)
1.3 对输入和Multi-Head Attention做Add&Norm,再对上步输出和Feed Forward做Add&Norm
我们聚焦下transformer论文中原图的这部分,可知,输入通过embedding+位置编码后,先后做以下两个步骤
- 针对query向量做multi-head attention,得到的结果与原query向量,做相加并归一化
这个相加具体是怎么个相加法呢?事实上,Add代表的Residual Connection(残差连接),是为了解决多层神经网络训练困难的问题,通过将前一层的信息无差的传递到下一层,可以有效的仅关注差异部分,这一方法之前在图像处理结构如ResNet等中常常用到 具体编码时通过 SublayerConnection 函数实现此功能attention = self.attention(query, key, value, mask) output = self.dropout(self.norm1(attention + query))
而Norm则代表了Layer Normalization,通过对层的激活值的归一化,可以加速模型的训练过程,使其更快的收敛,编码时用 LayerNorm 函数实现"""一个残差连接(residual connection),后面跟着一个层归一化(layer normalization)操作""" class SublayerConnection(nn.Module): # 初始化函数,接收size(层的维度大小)和dropout(dropout率)作为输入参数 def __init__(self, size, dropout): super(SublayerConnection, self).__init__() # 调用父类nn.Module的构造函数 self.norm = LayerNorm(size) # 定义一个层归一化(Layer Normalization)操作,使用size作为输入维度 self.dropout = nn.Dropout(dropout) # 定义一个dropout层 # 定义前向传播函数,输入参数x是输入张量,sublayer是待执行的子层操作 def forward(self, x, sublayer): # 将残差连接应用于任何具有相同大小的子层 # 首先对输入x进行层归一化,然后执行子层操作(如self-attention或前馈神经网络) # 接着应用dropout,最后将结果与原始输入x相加。 return x + self.dropout(sublayer(self.norm(x)))
"""构建一个层归一化(layernorm)模块""" class LayerNorm(nn.Module): # 初始化函数,接收features(特征维度大小)和eps(防止除以零的微小值)作为输入参数 def __init__(self, features, eps=1e-6): super(LayerNorm, self).__init__() # 调用父类nn.Module的构造函数 self.a_2 = nn.Parameter(torch.ones(features)) # 定义一个大小为features的一维张量,初始化为全1,并将其设置为可训练参数 self.b_2 = nn.Parameter(torch.zeros(features)) # 定义一个大小为features的一维张量,初始化为全0,并将其设置为可训练参数 self.eps = eps # 将防止除以零的微小值eps保存为类实例的属性 # 定义前向传播函数,输入参数x是输入张量 def forward(self, x): mean = x.mean(-1, keepdim=True) # 计算输入x在最后一个维度上的均值,保持输出结果的维度 std = x.std(-1, keepdim=True) # 计算输入x在最后一个维度上的标准差,保持输出结果的维度 # 对输入x进行层归一化,使用可训练参数a_2和b_2进行缩放和偏移,最后返回归一化后的结果 return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
- 上面步骤得到的『输出结果output做feed forward』之后,再与『上面步骤的原输出结果output』也做相加并归一化
forward = self.feed_forward(output) block_output = self.dropout(self.norm2(forward + output)) return block_output
最终这个编码器层代码可以完整的写为
"""编码器(Encoder)由自注意力(self-attention)层和前馈神经网络(feed forward)层组成"""
class EncoderLayer(nn.Module):
# 初始化函数,接收size(层的维度大小)、self_attn(自注意力层实例)
# feed_forward(前馈神经网络实例)和dropout(dropout率)作为输入参数
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__() # 调用父类nn.Module的构造函数
self.self_attn = self_attn # 将自注意力层实例保存为类实例的属性
self.feed_forward = feed_forward # 将前馈神经网络实例保存为类实例的属性
# 创建两个具有相同参数的SublayerConnection实例(用于残差连接和层归一化)
self.sublayer = clones(SublayerConnection(size, dropout), 2)
self.size = size # 将层的维度大小保存为类实例的属性
def forward(self, x, mask):
# 先对输入x进行自注意力操作
# 然后将结果传递给第一个SublayerConnection实例(包括残差连接和层归一化)
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
# 将上一步的输出传递给前馈神经网络
# 然后将结果传递给第二个SublayerConnection实例(包括残差连接和层归一化),最后返回结果
return self.sublayer[1](x, self.feed_forward)
1.3.1 缩放点积注意力(Scaled Dot-Product Attention)
接下来,先看下缩放点积注意力(Scaled Dot-Product Attention)的整体实现步骤
- 为了计算每个单词与其他单词之间的相似度,会拿「每个单词/token的q向量」与「包括自身在内所有单词/token的k向量」一一做点积(两个向量之间的点积结果可以代表两个向量的相似度) 对应到矩阵的形式上,则是矩阵Q与K矩阵的转置做相乘
还是拿上面那个例子:「我想吃酸菜鱼」,则Q乘以K的转置如下图所示 最终得到的矩阵有6行6列,从上往下逐行来看的话,每一个格子里都会有一个数值,每一个数值依次代表:
单词我与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度,比如可能是0.3 0.2 0.2 0.1 0.1 0.1,代表编码1时放在「我 想 吃 酸 菜 鱼」上面的注意力大小
同时,可以看到模型在对当前位置的信息进行编码时,会过度的将注意力集中于自身的位置(当然 这无可厚非,毕竟自己与自己最相似嘛),而可能忽略了其它位置。很快你会看到,作者采取的一种解决方案就是采用多头注意力机制(Multi-Head Attention)
想与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
吃与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
酸与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
菜与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度
鱼与「我 想 吃 酸 菜 鱼」各自的点积结果或相似度 - 由于会随着dimension的增大而增大,为避免过大,所以除以 ,相当于对点积的结果做下缩放 其中,是向量的维度,且,如果只设置了一个头,那就是模型的维度,如果设置了8个头,则,且如果模型的维度是512维,则即等于8
上面两步的代码可以如下编写# torch.matmul是PyTorch库提供的矩阵乘法函数 # 具体操作即是将第一个矩阵的每一行与第二个矩阵的每一列进行点积(对应元素相乘并求和),得到新矩阵的每个元素 scores = torch.matmul(query, key.transpose(-2, -1)) \ / math.sqrt(d_k)
- 接着使用 Softmax 计算每一个单词对包括自身在内所有单词的 Attention值,这些值加起来的和为1(相当于起到了归一化的效果) 这步对应的代码为
# 对 scores 进行 softmax 操作,得到注意力权重 p_attn p_attn = F.softmax(scores, dim = -1)
- 最后再乘以矩阵,即对所有values(v1 v2 v3 v4),根据不同的attention值( ),做加权平均 对应到我想吃酸菜鱼这个例子上,则是
- 最终得到单词的输出,如下图所示(图中V矩阵的4行分别代表v1 v2 v3 v4): 上述两步对应的代码为
# 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出 return torch.matmul(p_attn, value), p_attn
同样的方法,也可以计算出,如下图8所示, b2就是拿q2去对其他的key做attention,最后再与其他的value值相乘取weighted sum得到,最终每个单词都包含了上下文相关单词的语义信息,不再只是attention计算之前,每个单词只有它自己的信息,和上下文没有关联
另外,这里面还有一点值得注意的是,可能有同学疑问:当我们计算x1与x2、x3、x4的相似度之后,x2会再与x1、x3、x4再依次计算一遍相似度,这两个过程中,前者算过了x1和x2的相似度,后者则再算一遍x2与x1的相似度,这不是重复计算么?其实不然,这是两码事,原因很简单,正如你喜欢一个人 你会觉得她对你很重要,但那个人不一定喜欢你 她不会觉得你对她有多重要..
最终,Scaled Dot-Product Attention这部分对应的完整代码可以写为
'''计算“缩放点积注意力'''
# query, key, value 是输入的向量组
# mask 用于遮掩某些位置,防止计算注意力
# dropout 用于添加随机性,有助于防止过拟合
def attention(query, key, value, mask=None, dropout=None):
d_k = query.size(-1) # 获取 query 向量的最后一个维度的大小,即词嵌入的维度
# 计算 query 和 key 的点积,并对结果进行缩放,以减少梯度消失或爆炸的可能性
scores = torch.matmul(query, key.transpose(-2, -1)) \
/ math.sqrt(d_k)
# 如果提供了 mask,根据 mask 对 scores 进行遮掩
# 遮掩的具体方法就是设为一个很大的负数比如-1e9,从而softmax后 对应概率基本为0
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
# 对 scores 进行 softmax 操作,得到注意力权重 p_attn
p_attn = F.softmax(scores, dim = -1)
# 如果提供了 dropout,对注意力权重 p_attn 进行 dropout 操作
if dropout is not None:
p_attn = dropout(p_attn)
# 用注意力权重 p_attn 对 value 向量进行加权求和,得到最终的输出
return torch.matmul(p_attn, value), p_attn
1.3.2 多头注意力(Multi-Head Attention)
先看2个头的例子,依然还是通过生成对应的三个矩阵、、,然后这三个矩阵再各自乘以两个转移矩阵得到对应的分矩阵,如
- 矩阵对应的两个分矩阵、
- 矩阵对应的两个分矩阵为、
- 矩阵对应的两个分矩阵为、
至于同理,也生成对应的6个分矩阵、、、、、
接下来编码时,分两步
- 先与做点积然后乘以,然后再与做点积再乘以,再把这两个计算的结果相加得到
- 再分别与做点积然后乘以、然后再与做点积再乘以,再把这两个计算的结果相加得到
如果是8个头呢,计算步骤上也是一样的,只是从2个头变化到8个头而已,最终把每个头得到的结果直接concat,最后经过一个linear变换,得到最终的输出,整体如下所示
这部分Multi-Head Attention的代码可以写为
'''代码来自nlp.seas.harvard.edu,我针对每一行代码、甚至每行代码中的部分变量都做了详细的注释/解读'''
class MultiHeadedAttention(nn.Module):
# 输入模型的大小(d_model)和注意力头的数量(h)
def __init__(self, h, d_model, dropout=0.1):
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0 # 确保 d_model 可以被 h 整除
# 我们假设 d_v(值向量的维度)总是等于 d_k(键向量的维度)
self.d_k = d_model // h # 计算每个注意力头的维度
self.h = h # 保存注意力头的数量
self.linears = clones(nn.Linear(d_model, d_model), 4) # 上文解释过的四个线性层
self.attn = None # 初始化注意力权重为 None
self.dropout = nn.Dropout(p=dropout) # 定义 dropout 层
# 实现多头注意力的前向传播
def forward(self, query, key, value, mask=None):
if mask is not None:
# 对所有 h 个头应用相同的 mask
mask = mask.unsqueeze(1)
nbatches = query.size(0) # 获取 batch 的大小
# 1) 批量执行从 d_model 到 h x d_k 的线性投影
query, key, value = \
[l(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for l, x in zip(self.linears, (query, key, value))]
# 2) 在批量投影的向量上应用注意力
# 具体方法是调用上面实现Scaled Dot-Product Attention的attention函数
x, self.attn = attention(query, key, value, mask=mask,
dropout=self.dropout)
# 3) 使用 view 函数进行“拼接concat”,然后做下Linear变换
x = x.transpose(1, 2).contiguous() \
.view(nbatches, -1, self.h * self.d_k)
return self.linears[-1](x) # 返回多头注意力的输出
1.3.3 Position-wise前馈网络的实现
在上文,咱们逐一编码实现了embedding、位置编码、缩放点积/多头注意力,以及Add和Norm,整个编码器部分还剩最后一个模块,即下图框里的Feed Forward Network(简称FFN)
其中包括两个线性变换:维度上先扩大后缩小,最终输入和输出的维数为,内层的维度为,过程中使用ReLU作为激活函数
虽然线性变换在不同位置上是相同的,但它们在层与层之间使用不同的参数,相当于使用了两个内核大小为1的卷积
这部分的代码可以如下编写
‘’‘定义一个名为PositionwiseFeedForward的类,继承自nn.Module’‘’
class PositionwiseFeedForward(nn.Module):
# 文档字符串:实现FFN方程
# 初始化方法,接受三个参数:d_model,d_ff和dropout(默认值为0.1)
def __init__(self, d_model, d_ff, dropout=0.1):
# 调用父类nn.Module的初始化方法
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff) # 定义一个全连接层,输入维度为d_model,输出维度为d_ff
self.w_2 = nn.Linear(d_ff, d_model) # 定义一个全连接层,输入维度为d_ff,输出维度为d_model
self.dropout = nn.Dropout(dropout) # 定义一个dropout层,dropout概率为传入的dropout参数
# 定义前向传播方法,接受一个输入参数x
def forward(self, x):
# 将输入x通过第一个全连接层w_1后,经过ReLU激活函数,再通过dropout层,最后通过第二个全连接层w_2,返回最终结果
return self.w_2(self.dropout(F.relu(self.w_1(x))))
1.4 对整个transformer block复制N份最终成整个encode模块
N可以等于6或其他数值
class Encoder(nn.Module): # 定义一个名为Encoder的类,它继承了nn.Module类
# 一个具有N层堆叠的核心编码器
# 初始化方法,接受两个参数:layer(编码器层的类型)和N(编码器层的数量)
def __init__(self, layer, N):
super(Encoder, self).__init__() # 调用父类nn.Module的初始化方法
self.layers = clones(layer, N) # 创建N个编码器层的副本,并将其赋值给实例变量self.layers
self.norm = LayerNorm(layer.size) # 创建一个LayerNorm层,并将其赋值给实例变量self.norm
# 定义前向传播方法,接受两个参数:x(输入数据)和mask(掩码)
def forward(self, x, mask):
# 文档字符串:解释本方法的功能是将输入(及其掩码)依次传递给每一层
for layer in self.layers: # 遍历self.layers中的每一个编码器层
x = layer(x, mask) # 将输入x和mask传递给当前编码器层,并将输出结果赋值给x
return self.norm(x) # 对最终的输出x应用LayerNorm层,并将结果返回
其中的clone函数的代码为
def clones(module, N):
"Produce N identical layers."
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
第二部分 从零实现Transformer解码器模块
咱们再回顾下transformer的整个模型架构,特别是解码器的部分,毕竟BERT外,GPT等很有影响力的模型都用的transformer decode结构
从底至上,
- 输入包括2部分,下方是前一个time step的输出的embedding
再加上一个表示位置的Positional Encoding - 接着是Masked Multi-Head Self-attention,masked字面意思是屏蔽 然后做一下Add&Norm
- 再往上是一个不带mask的Multi-Head Attention层,它的Key、Value矩阵使用 Encoder 的编码信息矩阵,而Query使用上一个 Decoder block 的输出计算
然后再做一下Add&Norm - 继续往上,经过一个FFN层,也做一下Add&Norm
- 最后做下linear变换后,通过Softmax 层计算下一个翻译单词的概率
由于在第一部分介绍过了embedding、positional encoding、FFN、Add&Norm、linear、softmax、multi-head attention,故本部分只重点介绍下Masked Multi-Head Self-attention
2.1 Masked Multi-Head Self-attention
本过程和第一部分介绍的Multi-Head self-attention基本一致,区别在于加了个mask机制
- 输入经过embedding + 位置编码之后,还是乘以三个不同的权重矩阵:、、,依次得到三个不同的矩阵输入:Q、K、V
- Q矩阵乘以K矩阵的转置,得到,注意,紧接着会再乘以一个Mask矩阵,得到Masked Attention矩阵
- Masked Attention矩阵经过softmax后,乘以V矩阵得到矩阵
- 最终把、拼接之后,再做一个linear变换得到最终的矩阵
2.2 transformer解码器架构与整体编码-解码架构的实现
整个解码器架构的代码可以如下编写『有一点值得注意的是,如下文代码中所述
- 在对输入x执行自注意力计算并进行第一个子层的处理(带mask),最后一个参数是tgt_mask,即x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
- 但对输入x执行源注意力计算并进行第二个子层的处理时(不带mask),最后一个参数是src_mask,即x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask)) 』
# 定义DecoderLayer类,继承自PyTorch的nn.Module类
class DecoderLayer(nn.Module):
# 初始化方法,接收五个参数:size, self_attn, src_attn, feed_forward, dropout
# 调用父类nn.Module的初始化方法
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
super(DecoderLayer, self).__init__()
# 将size赋值给实例变量self.size
self.size = size
# 将self_attn赋值给实例变量self.self_attn
self.self_attn = self_attn
# 将src_attn赋值给实例变量self.src_attn
self.src_attn = src_attn
# 将feed_forward赋值给实例变量self.feed_forward
self.feed_forward = feed_forward
# 使用SublayerConnection类创建三个子层,并存储到实例变量self.sublayer中
self.sublayer = clones(SublayerConnection(size, dropout), 3)
# 定义前向传播方法,接收四个参数:x, memory, src_mask, tgt_mask
def forward(self, x, memory, src_mask, tgt_mask):
# 将memory赋值给局部变量m
m = memory
# 对输入x执行自注意力计算并进行第一个子层的处理
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
# 对输入x执行源注意力计算并进行第二个子层的处理
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
# 对输入x执行前馈神经网络计算并进行第三个子层的处理,然后返回结果
return self.sublayer[2](x, self.feed_forward)
且Decoder也是由N=6个相同层组成
class Decoder(nn.Module):
"Generic N layer decoder with masking."
def __init__(self, layer, N):
super(Decoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
最终,整个transformer完整模型的整体封装代码为
def make_model(src_vocab, tgt_vocab, N=6,
d_model=512, d_ff=2048, h=8, dropout=0.1):
"Helper: Construct a model from hyperparameters."
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
position = PositionalEncoding(d_model, dropout)
model = EncoderDecoder(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
Decoder(DecoderLayer(d_model, c(attn), c(attn),
c(ff), dropout), N),
nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
Generator(d_model, tgt_vocab))
# This was important from their code.
# Initialize parameters with Glorot / fan_avg.
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform(p)
return model
# Small example model.
tmp_model = make_model(10, 10, 2)
None
2.3 编码器与解码器的协同
当我们把编码器和解码器组合到一起后,看下它两是如何一块协作的
需要注意的是
- Encoder中的Q、K、V全部来自于上一层单元的输出
而Decoder只有Q来自于上一个Decoder单元的输出,K与V都来自于Encoder最后一层的输出。也就是说,Decoder是要通过当前状态与Encoder的输出算出权重后(计算query与各个key的相似度),最后将Encoder的编码加权得到下一层的状态
比如当我们要把“Hello Word”翻译为“你好,世界”时
Decoder会计算“你好”这个query分别与“Hello”、“Word”这两个key的相似度
很明显,“你好”与“Hello”更相似,从而给“Hello”更大的权重,从而把“你好”对应到“Hello”,达到的效果就是“Hello”翻译为“你好” - 且在解码器中因为加了masked机制,自注意力层只允许关注已输出位置的信息,实现方法是在自注意力层的softmax之前进行mask,将未输出位置的权重设置为一个非常大的负数(进一步softmax之后基本变为0,相当于直接屏蔽了未输出位置的信息)
第三部分 Transformer的整个训练过程:预处理与迭代
3.1 预处理阶段:创建词汇表
具体实现时,先创建批次和掩码
class Batch:
def __init__(self, src, trg=None, pad=0):
self.src = src # 输入数据源(通常为源语言)
self.src_mask = (src != pad).unsqueeze(-2) # 创建源语言的掩码,用于忽略填充部分
if trg is not None: # 如果目标语言数据存在
self.trg = trg[:, :-1] # 目标语言数据,去掉最后一个词
self.trg_y = trg[:, 1:] # 目标语言数据,去掉第一个词
self.trg_mask = \
self.make_std_mask(self.trg, pad) # 创建目标语言的掩码,用于忽略填充部分和未来词汇
self.ntokens = (self.trg_y != pad).data.sum() # 计算目标语言中非填充词的数量
@staticmethod
def make_std_mask(tgt, pad):
"Create a mask to hide padding and future words."
tgt_mask = (tgt != pad).unsqueeze(-2) # 创建目标语言的掩码,用于忽略填充部分
tgt_mask = tgt_mask & Variable(
subsequent_mask(tgt.size(-1)).type_as(tgt_mask.data)) # 使用子掩码屏蔽未来词汇
return tgt_mask # 返回完整的目标语言掩码
其中,subsequent_mask的实现如下所示
def subsequent_mask(size):
"Mask out subsequent positions."
attn_shape = (1, size, size)
subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
return torch.from_numpy(subsequent_mask) == 0
3.2 训练三部曲:随机初始化、损失函数、反向传播
接下来,我们创建一个通用的训练和得分函数来跟踪损失。我们传入一个通用的损失计算函数,它也处理参数更新
def run_epoch(data_iter, model, loss_compute):
start = time.time() # 记录当前时间
total_tokens = 0 # 初始化总tokens计数
total_loss = 0 # 初始化总损失
tokens = 0 # 初始化tokens计数
# 遍历数据集中的每个批次
for i, batch in enumerate(data_iter):
# 对每个批次进行前向传播
out = model.forward(batch.src, batch.trg,
batch.src_mask, batch.trg_mask)
# 计算每个批次的损失
loss = loss_compute(out, batch.trg_y, batch.ntokens)
# 累加损失
total_loss += loss
total_tokens += batch.ntokens # 累加tokens
tokens += batch.ntokens # 累加tokens
# 每50个批次进行一次日志记录
if i % 50 == 1:
elapsed = time.time() - start # 计算已用时间
# 输出当前批次,损失和每秒处理的tokens
print("Epoch Step: %d Loss: %f Tokens per Sec: %f" %
(i, loss / batch.ntokens, tokens / elapsed))
start = time.time() # 重置开始时间
tokens = 0 # 重置tokens计数
return total_loss / total_tokens # 返回平均损失
下面这段代码定义了一个名为 SimpleLossCompute 的类,实现了简单的损失计算和训练函数
- 在调用该类的实例时,输入预测输出、目标输出和规范化因子,计算损失值并进行梯度更新
- 如果提供了优化器,还会更新模型参数和清空梯度缓存
# 定义 SimpleLossCompute 类,实现简单的损失计算和训练函数
class SimpleLossCompute:
# 初始化 SimpleLossCompute 类的实例
def __init__(self, generator, criterion, opt=None):
self.generator = generator # 生成器,用于预测输出
self.criterion = criterion # 损失函数,如交叉熵损失
self.opt = opt # 优化器,如 Adam
# 定义调用 SimpleLossCompute 类实例时的操作
def __call__(self, x, y, norm):
x = self.generator(x) # 生成预测输出
# 计算损失,这里需要将预测输出和目标输出转换为合适的形状
loss = self.criterion(x.contiguous().view(-1, x.size(-1)),
y.contiguous().view(-1)) / norm
loss.backward() # 计算梯度
if self.opt is not None: # 如果提供了优化器
self.opt.step() # 更新模型参数
self.opt.optimizer.zero_grad() # 清空梯度缓存
return loss.data[0] * norm # 返回损失值乘以规范化因子(实际损失值)
3.2.1 Adam优化器:自动调整学习率并具有动量效应
优化器(optimizer)经常用于在训练过程中更新模型参数以最小化损失函数,而Adam(Adaptive Moment Estimation)是一种常用的优化器,它结合了两种传统优化算法的优点:Momentum和RMSprop
为了通俗易懂地理解Adam,可以将其比作一个赛车手。训练模型就像是找到一辆赛车在赛道上的最佳行驶速度和路径,以达到最快的速度并取得优异的成绩。在这个过程中,速度的调整(即学习率)非常重要
-
首先,Adam像Momentum一样,具有动量效应。这意味着赛车手(模型)会积累动量,使其在下坡时更快,而在上坡时减速。这有助于模型更快地穿越平坦区域,并避免在最低点附近摆动
-
其次,Adam像RMSprop一样,会自适应地调整每个参数的学习率。在我们的赛车比喻中,这就像赛车手会针对每个轮胎的摩擦系数(赛道状况)做出相应的速度调整。这有助于模型更快地收敛到最优解
总之,Adam可以自动调整学习率,并具有动量效应。总的来说,它能帮助我们的“赛车手”在不同的赛道状况下更快地找到最佳行驶速度和路径,从而更快地训练出高效的模型
transformer原始论文便选择的Adam作为优化器,其参数为,和,根据以下公式,我们在训练过程中改变了学习率:
在预热中随步数线性地增加学习速率,并且此后与步数的反平方根成比例地减小它,设置预热步数为4000
我们来看下具体的编码实现。下面这段代码定义了一个名为 NoamOpt 的类,实现了一种自适应学习率调整策略,该策略在训练 Transformer 模型时常用。在训练的前几个步骤(预热期)中,学习率会线性增长,之后学习率会随着步数的增加而逐渐降低。这种策略有助于模型在训练初期更快地收敛,同时在训练后期保持较低的学习率,有利于模型的稳定训练。
# 定义 NoamOpt 类,实现自适应学习率调整策略
class NoamOpt:
# 初始化 NoamOpt 类的实例
def __init__(self, model_size, factor, warmup, optimizer):
self.optimizer = optimizer # 优化器对象(如 Adam)
self._step = 0 # 记录优化步数
self.warmup = warmup # 预热步数
self.factor = factor # 缩放因子
self.model_size = model_size # 模型维度大小
self._rate = 0 # 初始学习率
# 更新模型参数和学习率
def step(self):
self._step += 1 # 优化步数加 1
rate = self.rate() # 计算当前学习率
for p in self.optimizer.param_groups: # 更新优化器中的学习率
p['lr'] = rate
self._rate = rate # 存储当前学习率
self.optimizer.step() # 更新模型参数
# 计算当前步数的学习率
def rate(self, step=None):
if step is None: # 如果未提供步数,使用当前步数
step = self._step
return self.factor * \
(self.model_size ** (-0.5) * # 计算学习率公式中的模型维度项
min(step ** (-0.5), step * self.warmup ** (-1.5))) # 计算学习率公式中的最小值项
# 定义用于获取 NoamOpt 类实例的函数
def get_std_opt(model):
return NoamOpt(model.src_embed[0].d_model, 2, 4000,
torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
最后总结一下Transformer的影响力
- OpenAI基于它发展出了GPT,并不断迭代出GPT2、GPT3、GPT3.5及火爆全球的 ChatGPT
- Google则基于它发展出了在ChatGPT出现之前统治NLP各大任务的BERT,多好的青春年华!
第四部分 Hugging face社区实现的Transformers库的整体解读
目前绝大部分有影响力的大模型基本都基于transformer的架构 (这个页面底部可以看到基于transformer的200多个有影响力的模型),既然基于transformer便得实现transformer
- 而上文更多只是为了方便理解原理而做的简易版的实现
- 实际运用时基本都用的Hugging face社区实现的Transformers库 「比如此文的2.2节:Stanford Alpaca的微调拆解——见证LLM微调的一般模式 」: https://github.com/huggingface/transformers/tree/main/src/transformers,功能强大且便捷
然要分析这么一个大库是不容易的,如下图所示,包括分词等等各种功能
且光trainer.py(https://github.com/huggingface/transformers/blob/main/src/transformers/trainer.py)这一个项目文件的实现就有3858行
4.1 逐行解读:3858行的transformers/src/transformers/trainer.py
4.1.1 导入一系列Python/numpy/torch里面的各种库
- # coding=utf-8:这行定义了此脚本文件的编码格式为utf-8
- 2-12. 这些行是关于版权和Apache许可证的声明。代码可以在遵守这些许可证条款的情况下被使用
这是一个模块级的docstring,解释了这个模块的主要功能,即创建一个可以轻松训练或微调HuggingFace Transformers模型的Trainer类 - 15-30. 导入了一些常用的Python标准库,包括对文件、操作系统、时间、警告等的操作,以及一些集合和类型检查的工具
- 33-36. 这里首先导入了和模型训练相关的集成工具。这些工具包括了报告集成回调、超参数、判断fairscale(一个优化PyTorch模型训练的库)是否可用等
- 39-45. 导入了numpy和torch以及其分布式模块,这些是进行深度学习计算的基础库。同时也导入了huggingface_hub的Repository和create_repo,它们是用于与HuggingFace模型Hub进行交互的工具
- 46-51. 导入了torch内的nn模块,以及torch.utils.data模块中的DataLoader, Dataset, RandomSampler, SequentialSampler,这些是用于处理神经网络和数据的基本工具
这行导入了当前模块的版本信息 - 54-87. 这些行导入了许多与模型训练相关的工具和函数,包括预训练模型和配置,数据整理,调试工具,优化器,层标准化,分布式训练工具(比如deepspeed),回调函数等
- 89-132. 这些行导入了一些与训练有关的工具和函数,包括分布式策略,内存跟踪,优化器名称,训练参数等
- 134-173. 这些行导入了一些工具和函数,主要用于处理适配器,配置,权重,日志,数据集,设备检测等
- 175-177. 定义了默认的回调函数列表和默认的进度回调函数
- 179-189. 根据环境的可用性,可能会导入和Notebook, Apex, 数据集, Torch TPU, Fairscale相关的模块
- 191-200. 如果SageMaker模型并行可用,那么导入与其相关的模块,并检查其版本
如果安全张量库可用,就导入它
如果性能分析工具PEFT可用,就导入它 - 206-217. 如果Accelerate可用,那么导入与其相关的模块,并检查其版本
使用TYPE_CHECKING做类型检查,如果是,就导入optuna模块
设置了logger用于日志记录 - 224-230. 定义了一些常量,它们是用于保存训练时的参数,状态,优化器,调度器,梯度缩放器等信息的文件名
4.1.2 定义class Trainer,先做一些初始化设置
然后定义class Trainer,逐一实现了如下函数
- func __init__
硬件配置:代码首先判断是否需要将模型放置在特定的设备(如 GPU 或 CPU)上。一些特殊情况,如使用了模型并行、深度学习库DeepSpeed、完全bf16或fp16评估、数据并行处理和完全分片的数据并行处理,都会对这个决定产生影响。
数据预处理:然后,代码会创建一个用于数据处理的 data_collator,这个 data_collator 会根据是否有分词器(tokenizer)来选择默认的数据整理器。这个整理器将在训练和验证过程中用于整理数据。
优化器与学习率调度器:然后,代码检查了优化器和学习率调度器是否已经设置,并在必要时进行了一些配置。在这里,还进行了一些错误检查,以防模型和优化器参数不在同一个设备上,或者优化器与使用的并行处理库(如Fairscale、Deepspeed或PyTorch FSDP)不兼容。
回调函数:最后,代码初始化了一些默认的回调函数,并在需要时创建了一个远程仓库的克隆和输出目录。这些回调函数将在训练过程中的不同时间点被调用,可以用来做一些自定义的操作,比如在每个 epoch 结束后保存模型。
混合精度设置:代码首先检查是否需要使用混合精度训练(即使用 fp16 或 bf16)。如果需要,根据后端类型(例如 "cuda_amp" 或 "cpu_amp"),选择正确的混合精度训练策略。在这里,也进行了一些错误检查,以防混合精度训练与使用的并行处理库(如SageMaker Model Parallelism)不兼容。
标签平滑:然后,代码检查是否需要使用标签平滑(一种常见的防止过拟合的技巧),并在需要时设置相应的对象。
训练器状态和控制:接下来,代码初始化了训练器的状态和控制对象,这两个对象将在训练过程中用于跟踪训练的进展和控制训练的流程。
其他设置:最后,代码还进行了一些其他的设置,比如初始化内存跟踪器,设置训练批次的大小,以及处理一些特定的训练参数(如 "torch_compile")
- func add_callback
- func pop_callback
- func remove_callback
- func _move_model_to_device
- func _set_signature_columns_if_needed
- func _remove_unused_columns
- func _get_collator_with_removed_columns
4.1.3 训练数据集、验证数据集相关
- func _get_train_sampler
# 获取训练采样器 def _get_train_sampler(self) -> Optional[torch.utils.data.Sampler]: if self.train_dataset is None or not has_length(self.train_dataset): # 如果没有训练数据集或训练数据集没有长度,返回None return None # 创建采样器 if self.args.group_by_length: # 如果参数设定了按长度分组 if is_datasets_available() and isinstance(self.train_dataset, datasets.Dataset): # 如果有datasets库并且训练数据集是datasets.Dataset的实例 lengths = ( self.train_dataset[self.args.length_column_name] if self.args.length_column_name in self.train_dataset.column_names else None ) # 如果训练数据集中有长度列名,获取长度,否则长度为None else: lengths = None # 否则,长度为None model_input_name = self.tokenizer.model_input_names[0] if self.tokenizer is not None else None # 获取模型输入名称 return LengthGroupedSampler( # 返回长度分组采样器 self.args.train_batch_size * self.args.gradient_accumulation_steps, dataset=self.train_dataset, lengths=lengths, model_input_name=model_input_name, ) else: return RandomSampler(self.train_dataset) # 否则,返回随机采样器
- func get_train_dataloader
# 获取训练数据的 DataLoader def get_train_dataloader(self) -> DataLoader: """ 返回训练[`~torch.utils.data.DataLoader`]。 如果`train_dataset`未实现`__len__`,将不使用采样器, 否则,使用适应于分布式训练的随机采样器。 如果想注入一些自定义行为,可以在子类中重写此方法。 """ # 如果训练集为空,则抛出 ValueError if self.train_dataset is None: raise ValueError("Trainer: training requires a train_dataset.") # 创建训练数据集和数据整理器 train_dataset = self.train_dataset data_collator = self.data_collator # 如果训练集是数据集的实例,移除未使用的列 if is_datasets_available() and isinstance(train_dataset, datasets.Dataset): train_dataset = self._remove_unused_columns(train_dataset, description="training") # 否则,使用数据整理器移除未使用的列 else: data_collator = self._get_collator_with_removed_columns(data_collator, description="training") # 定义 DataLoader 参数 dataloader_params = { "batch_size": self._train_batch_size, "collate_fn": data_collator, "num_workers": self.args.dataloader_num_workers, "pin_memory": self.args.dataloader_pin_memory, } # 如果训练集不是迭代的数据集,设定采样器和其他参数 if not isinstance(train_dataset, torch.utils.data.IterableDataset): dataloader_params["sampler"] = self._get_train_sampler() dataloader_params["drop_last"] = self.args.dataloader_drop_last dataloader_params["worker_init_fn"] = seed_worker # 返回由 accelerator 处理过的 DataLoader return self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))
- func _get_eval_sampler
# 获取评估数据的采样器 def _get_eval_sampler(self, eval_dataset: Dataset) -> Optional[torch.utils.data.Sampler]: # 废弃的代码 if self.args.use_legacy_prediction_loop: # 如果是在TPU上运行,返回 SequentialDistributedSampler if is_torch_tpu_available(): return SequentialDistributedSampler( eval_dataset, num_replicas=xm.xrt_world_size(), rank=xm.get_ordinal() ) # 如果是在Sagemaker多处理器环境中运行,返回SequentialDistributedSampler elif is_sagemaker_mp_enabled(): return SequentialDistributedSampler( eval_dataset, num_replicas=smp.dp_size(), rank=smp.dp_rank(), batch_size=self.args.per_device_eval_batch_size, ) # 其他情况下,返回顺序采样器 else: return SequentialSampler(eval_dataset) # 如果是单机环境,返回顺序采样器;否则,返回 None if self.args.world_size <= 1: return SequentialSampler(eval_dataset) else: return None
- func get_eval_dataloader
# 获取评估数据的 DataLoader def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoader: """ 返回评估[`~torch.utils.data.DataLoader`]。 如果想注入一些自定义行为,可以在子类中重写此方法。 Args: eval_dataset (`torch.utils.data.Dataset`, *optional*): 如果提供,将覆盖`self.eval_dataset`。如果它是一个[`~datasets.Dataset`],自动删除模型的`forward()` 方法不接受的列。必须实现`__len__`。 """ # 如果评估集为空,则抛出 ValueError if eval_dataset is None and self.eval_dataset is None: raise ValueError("Trainer: evaluation requires an eval_dataset.") # 创建评估数据集和数据整理器 eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset data_collator = self.data_collator # 如果评估集是数据集的实例,移除未使用的列 if is_datasets_available() and isinstance(eval_dataset, datasets.Dataset): eval_dataset = self._remove_unused_columns(eval_dataset, description="evaluation") # 否则,使用数据整理器移除未使用的列 else: data_collator = self._get_collator_with_removed_columns(data_collator, description="evaluation") # 定义 DataLoader 参数 dataloader_params = { "batch_size": self.args.eval_batch_size, "collate_fn": data_collator, "num_workers": self.args.dataloader_num_workers, "pin_memory": self.args.dataloader_pin_memory, } # 如果评估集不是迭代的数据集,设定采样器和其他参数 if not isinstance(eval_dataset, torch.utils.data.IterableDataset): dataloader_params["sampler"] = self._get_eval_sampler(eval_dataset) dataloader_params["drop_last"] = self.args.dataloader_drop_last # 返回由 accelerator 处理过的 DataLoader return self.accelerator.prepare(DataLoader(eval_dataset, **dataloader_params))
- func get_test_dataloader
def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader: """ 返回测试集的数据加载器 [`~torch.utils.data.DataLoader`] 如果需要插入一些自定义行为,可以在子类中重写此方法 Args: test_dataset (`torch.utils.data.Dataset`, *optional*): 要使用的测试数据集。如果它是一个 [`~datasets.Dataset`],则自动删除 `model.forward()` 方法不接受的列。它必须实现 `__len__` """ data_collator = self.data_collator # 获取数据处理器 # 如果datasets库可用且test_dataset是datasets.Dataset类型,移除不必要的列 if is_datasets_available() and isinstance(test_dataset, datasets.Dataset): test_dataset = self._remove_unused_columns(test_dataset, description="test") else: data_collator = self._get_collator_with_removed_columns(data_collator, description="test") # 定义数据加载器参数 dataloader_params = { "batch_size": self.args.eval_batch_size, # 批大小 "collate_fn": data_collator, # 数据处理函数 "num_workers": self.args.dataloader_num_workers, # 工作线程数量 "pin_memory": self.args.dataloader_pin_memory, # 是否将数据加载器的数据放在固定的内存区域 } # 如果test_dataset不是可迭代数据集,添加采样器和drop_last参数 if not isinstance(test_dataset, torch.utils.data.IterableDataset): dataloader_params["sampler"] = self._get_eval_sampler(test_dataset) # 添加采样器 dataloader_params["drop_last"] = self.args.dataloader_drop_last # 是否丢弃最后不完整的批次 # 返回加速器准备好的数据加载器 return self.accelerator.prepare(DataLoader(test_dataset, **dataloader_params))
4.1.4 一系列优化器函数的实现
- func create_optimizer_and_scheduler
def create_optimizer_and_scheduler(self, num_training_steps: int): """ 设置优化器和学习率调度器 我们提供一个合理的默认值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通过`optimizers`传递一个元组,或者在子类中重写此方法(或`create_optimizer`和/或`create_scheduler`)。 """ self.create_optimizer() # 创建优化器 # 如果SageMaker版本大于等于1.10且启用了fp16,解包优化器 if IS_SAGEMAKER_MP_POST_1_10 and smp.state.cfg.fp16: optimizer = self.optimizer.optimizer else: optimizer = self.optimizer self.create_scheduler(num_training_steps=num_training_steps, optimizer=optimizer) # 创建学习率调度器
- func create_optimizer
def create_optimizer(self): """ 设置优化器。 我们提供一个合理的默认值,工作得很好。如果你想使用其他的,你可以在Trainer的init中通过`optimizers`传递一个元组,或者在子类中重写此方法。 """ # 根据是否启用了SageMaker模型并行,选择不同的模型 opt_model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model # 如果优化器为空,初始化一个新的优化器 if self.optimizer is None: # 获取待优化参数,并区分是否需要权重衰减 decay_parameters = get_parameter_names(opt_model, ALL_LAYERNORM_LAYERS) decay_parameters = [name for name in decay_parameters if "bias" not in name] optimizer_grouped_parameters = [ { "params": [ p for n, p in opt_model.named_parameters() if (n in decay_parameters and p.requires_grad) ], "weight_decay": self.args.weight_decay, # 权重衰减 }, { "params": [ p for n, p in opt_model.named_parameters() if (n not in decay_parameters and p.requires_grad) ], "weight_decay": 0.0, # 不需要权重衰减 }, ] # 获取优化器类和参数 optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs(self.args) # 如果启用了简单的分片DDP,使用OSS作为优化器,否则使用获取的优化器 if self.sharded_ddp == ShardedDDPOption.SIMPLE: self.optimizer = OSS( params=optimizer_grouped_parameters, optim=optimizer_cls, **optimizer_kwargs, ) else: self.optimizer = optimizer_cls(optimizer_grouped_parameters, **optimizer_kwargs) if optimizer_cls.__name__ == "Adam8bit": import bitsandbytes manager = bitsandbytes.optim.GlobalOptimManager.get_instance() skipped = 0 for module in opt_model.modules(): if isinstance(module, nn.Embedding): skipped += sum({p.data_ptr(): p.numel() for p in module.parameters()}.values()) logger.info(f"skipped {module}: {skipped/2**20}M params") manager.register_module_override(module, "weight", {"optim_bits": 32}) logger.debug(f"bitsandbytes: will optimize {module} in fp32") logger.info(f"skipped: {skipped/2**20}M params") # 如果启用了SageMaker模型并行,使用SageMaker的分布式优化器 if is_sagemaker_mp_enabled(): self.optimizer = smp.DistributedOptimizer(self.optimizer) return self.optimizer
- func get_optimizer_cls_and_kwargs
根据提供的参数,选择并配置合适的优化器,以便在模型训练中使用
- 首先,从给定的训练参数中提取优化器参数,并将它们存储在一个字典中。
- 根据训练参数设定初始学习率。
- 针对Adam优化器设定一组基本参数(betas和eps)。
- 接着,根据优化器的类型(存储在args.optim中),选择合适的优化器类,并更新优化器参数。优化器类型可能有很多种,例如Adafactor,AdamW,SGD,Adagrad等等。
- 该函数还支持多种不同的AdamW优化器,例如来自HuggingFace,Torch,Apex等的版本,并根据需要更新参数。其中,对于一些特定的优化器类型(例如,AdamW的torch_xla版本或apex的FusedAdam版本),如果相关的库没有被正确安装,那么将会抛出错误信息。
- 该函数还支持处理来自bitsandbytes库中的优化器(例如,AdamW,Lion等),并能够根据参数调整其配置(例如,是否使用分页式的优化器,是否使用8位优化器等)。
- 对于一些其他特定类型的优化器(例如,来自torchdistx库的AnyPrecisionAdamW优化器),它还支持更多的参数设置。
- 最后,如果给定的优化器名称并没有被程序识别,那么将会抛出一个ValueError。
- 在选择和配置完优化器后,该函数会返回优化器类和优化器参数
4.1.5 学习率相关函数的实现
- func create_scheduler
# 定义创建学习率调度器的函数 def create_scheduler(self, num_training_steps: int, optimizer: torch.optim.Optimizer = None): """ 设置调度器。训练器的优化器必须在调用此方法之前已经设置好,或者作为参数传递。 Args: num_training_steps (int): 要进行的训练步数。 """ # 如果调度器还未设置 if self.lr_scheduler is None: # 使用 get_scheduler 函数创建调度器 self.lr_scheduler = get_scheduler( self.args.lr_scheduler_type, optimizer=self.optimizer if optimizer is None else optimizer, num_warmup_steps=self.args.get_warmup_steps(num_training_steps), num_training_steps=num_training_steps, ) # 返回创建的学习率调度器 return self.lr_scheduler
- func num_examples
- func _hp_search_setup
- func _report_to_hp_search
- func _tune_save_checkpoint
- func call_model_init
- func torch_jit_model_eval
4.1.6 分布式训练相关函数的实现
- func ipex_optimize_model
首先检查了 Intel PyTorch Extension (IPEX) 是否可用。IPEX 是一个基于 Intel oneAPI Deep Neural Network Library (oneDNN) 的 PyTorch 扩展库,可以帮助在 Intel 的硬件(如 CPU)上更高效地运行 PyTorch 程序
如果处于训练模式,函数会使用 IPEX 对模型和优化器进行优化;如果处于非训练模式(例如评估或测试),则仅对模型进行优化 - func_wrap_model
根据参数设置,可能会首先使用 IPEX 对模型进行优化。
如果启用了 Sagemaker 的模型并行,会将模型包装为 Sagemaker 的 DistributedModel。模型并行是一种训练大型模型的技术,它将模型的部分放在不同的 GPU 上,以克服单个 GPU 内存限制
如果模型已经被包装(可能在之前的步骤中被包装),则直接返回该模型
使用 NVIDIA APEX(一种可以提高 GPU 利用率和扩展训练的库)进行混合精度训练。这主要针对 PyTorch 版本小于1.6的情况,因为 PyTorch 1.6 及以上版本已经内置了混合精度训练的支持
如果启用了多 GPU 训练,且模型不是8bit模型(即该模型不支持 int8 类型),则使用 PyTorch 的 DataParallel 对模型进行数据并行处理。数据并行是一种将输入数据分块在多个 GPU 上并行处理的技术,可以有效地利用多个 GPU 进行训练。
如果启用了 JIT 模式评估,则对模型进行 JIT 编译。PyTorch 的 JIT 编译器可以将模型编译为中间表示(IR),然后在运行时对其进行优化,从而提高模型的运行效率。
如果不是训练模式(例如评估或测试),则在这个阶段返回模型,否则继续对模型进行进一步的包装 - func auto_wrapper_callable
- func patched_optimizer_step
4.1.7 主要训练入口:func train和func_inner_training_loop
- func train
""" 主要训练入口 """ def train( self, # 可选参数,接收字符串或布尔类型,代表从哪个检查点恢复训练 resume_from_checkpoint: Optional[Union[str, bool]] = None, # 可选参数,接收Optuna的Trial实例或者包含超参数的字典 trial: Union["optuna.Trial", Dict[str, Any]] = None, # 可选参数,接收一个字符串列表,代表在模型输出中需要忽略的键值 ignore_keys_for_eval: Optional[List[str]] = None, **kwargs, # 接收其他关键字参数,用于隐藏已弃用的参数 ): # 如果resume_from_checkpoint为False,将其设置为None if resume_from_checkpoint is False: resume_from_checkpoint = None # 内存指标 - 必须尽早设置 self._memory_tracker.start() args = self.args # 设置训练状态为True self.is_in_train = True # do_train可能未设置,但仍然可能调用.train(),所以下面的操作是为了避免这种情况 if (args.fp16_full_eval or args.bf16_full_eval) and not args.do_train: self._move_model_to_device(self.model, args.device) # 如果关键字参数中包含model_path if "model_path" in kwargs: # 将model_path的值赋给resume_from_checkpoint并在kwargs中删除model_path resume_from_checkpoint = kwargs.pop("model_path") warnings.warn( "`model_path` is deprecated and will be removed in a future version. Use `resume_from_checkpoint` " "instead.", # 发出关于model_path将在未来版本中删除的警告 FutureWarning, ) # 如果还有未处理的关键字参数 if len(kwargs) > 0: raise TypeError(f"train() received got unexpected keyword arguments: {', '.join(list(kwargs.keys()))}.") # 抛出类型错误 # 这可能会改变随机种子,因此需要先运行 self._hp_search_setup(trial) self._train_batch_size = self.args.train_batch_size # 设置训练批次大小 # 重载模型 model_reloaded = False if self.model_init is not None: # 如果模型初始化方法存在 # 在实例化模型时,必须先设置随机种子 enable_full_determinism(self.args.seed) if self.args.full_determinism else set_seed(self.args.seed) # 使用试验的超参数初始化模型 self.model = self.call_model_init(trial) # 将模型重载标记设置为True model_reloaded = True # 重新初始化优化器和调度器 self.optimizer, self.lr_scheduler = None, None # 加载可能存在的模型检查点 # 如果resume_from_checkpoint是bool类型且值为True if isinstance(resume_from_checkpoint, bool) and resume_from_checkpoint: # 从输出目录中获取最新的检查点 resume_from_checkpoint = get_last_checkpoint(args.output_dir) # 如果没有找到有效的检查点 if resume_from_checkpoint is None: raise ValueError(f"No valid checkpoint found in output directory ({args.output_dir})") # 抛出值错误 # 如果resume_from_checkpoint不为None,并且SageMaker MP和DeepSpeed没有启用 if resume_from_checkpoint is not None and not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled: # 从检查点恢复模型 self._load_from_checkpoint(resume_from_checkpoint) # 如果模型已经重载,将其放在正确的设备上并更新self.model_wrapped if model_reloaded: if self.place_model_on_device: self._move_model_to_device(self.model, args.device) self.model_wrapped = self.model # 查找可执行的批次大小 inner_training_loop = find_executable_batch_size( self._inner_training_loop, self._train_batch_size, args.auto_find_batch_size ) # 进行内部训练循环 return inner_training_loop( args=args, resume_from_checkpoint=resume_from_checkpoint, trial=trial, ignore_keys_for_eval=ignore_keys_for_eval, )
- func_inner_training_loop
-
首先,代码计算了每个epoch中的训练步骤数量(
steps_in_epoch
),这可以是数据加载器的长度,或者是最大步数乘以梯度累积步数。 -
然后,它会处理开始新的训练epoch,包括可能的从检查点恢复训练的步骤。
-
代码遍历了每个训练步骤,每个步骤接收输入数据,并进行以下操作:
- 在每个需要的步骤上同步随机数生成器的状态
- 跳过已经训练过的步骤(如果从检查点恢复训练)
- 调用回调函数处理步骤的开始
- 执行训练步骤,并计算训练损失
- 如果损失是NaN或Inf(无穷),则根据前面记录的损失进行调整
- 计算浮点运算的数量
- 如果达到了梯度累积的步骤,或者是最后一步,会进行以下操作:
- 执行梯度裁剪(如果需要)
- 执行优化器步骤,并判断优化器是否真正执行了步骤
- 如果优化器步骤执行了,进行学习率调度(除了在使用ReduceLROnPlateau学习率调度器的情况下,它需要在生成度量之后才执行调度)
- 模型的梯度清零
- 更新全局步骤和epoch数
- 调用回调函数处理步骤的结束
- 有条件地记录、保存和评估模型
- 如果训练应该停止,或者已经完成了所有的步骤,则退出循环
-
在每个epoch结束时,代码处理epoch的结束,可能会记录、保存和评估模型,检查是否有配置的TPU,并决定是否应该停止整个训练
4.1.8 对模型的加载、检查、评估、保存
- func_get_output_dir
- func_load_from_checkpoint
- func_load_best_model
- func_issue_warnings_after_load
- func_maybe_log_save_evaluate
这个函数主要执行的是在训练过程中的日志记录、模型评估和模型保存的操作。主要步骤包括:
- 记录日志:如果控制标志 should_log 为 True,那么就记录日志。记录的内容包括训练损失、学习率等信息,并对这些信息进行日志输出。
- 评估模型:如果控制标志 should_evaluate 为 True,那么就对模型进行评估。评估的数据集可以是多个,评估的结果将会用于更新学习率调度器或者报告给超参数搜索。
- 保存模型:如果控制标志 should_save 为 True,那么就保存模型的检查点。保存的内容包括模型、评估指标等信息
- func_load_rng_state
- func_save_checkpoint
- func_load_optimizer_and_scheduler
用于从给定的检查点位置加载优化器和学习率调度器的状态
这通常在训练的中断后恢复训练时使用,以确保训练可以从上次停止的地方继续。在加载状态时,需要考虑一些因素,例如是否启用了DeepSpeed,是否启用了SageMaker多处理,是否可用TPU,是否启用了全尺寸数据并行(FSDP)等。各种情况需要采用不同的方式来加载状态 - func opt_load_hook
- func opt_load_hook
- func hyperparameter_search
用于启动超参数搜索。可以使用不同的后端进行搜索,包括optuna、Ray Tune或SigOpt,默认使用optuna
该方法接收一个定义超参数搜索空间的函数,一个计算目标函数的函数,试验次数,优化方向(最小化或最大化),使用的后端,定义试验名称的函数,以及其他参数。这个方法用于寻找最佳的超参数组合,以使模型的性能达到最优 - func log
- func _prepare_input
- func _prepare_inputs
- func compute_loss_context_manager
- func autocast_smart_context_manager
4.1.9 一个训练步骤的实现:前向后向传播、计算损失
- training_step (第2661行-2660行)
一个训练步骤的实现,它涵盖了一个批量数据的前向和后向传播
# `training_step`函数表示训练过程中的一步操作,涵盖了模型的前向和后向传播
def training_step(self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]]) -> torch.Tensor:
# 将模型设置为训练模式,这对于某些层(如Dropout或BatchNorm)的行为有影响,因为它们在训练和评估阶段的行为是不同的
model.train()
# 调用一个辅助方法准备模型的输入,具体的实现取决于模型的需求
inputs = self._prepare_inputs(inputs)
# 如果启用了 SageMaker Model Parallelism,则使用 `smp_forward_backward` 在多个 GPU 上执行前向和后向操作
# 然后减小损失,并将其从计算图中分离
if is_sagemaker_mp_enabled():
loss_mb = smp_forward_backward(model, inputs, self.args.gradient_accumulation_steps)
return loss_mb.reduce_mean().detach().to(self.args.device)
# 计算损失值
with self.compute_loss_context_manager():
loss = self.compute_loss(model, inputs)
# 如果使用的 GPU 数量大于 1,则对损失值取平均,以处理多 GPU 并行训练
if self.args.n_gpu > 1:
loss = loss.mean() # mean() to average on multi-gpu parallel training
# 根据是否进行梯度缩放,选择不同的后向传播方式
if self.do_grad_scaling:
self.scaler.scale(loss).backward() # 使用梯度缩放进行后向传播,可以防止在混合精度训练中出现梯度下溢
elif self.use_apex:
with amp.scale_loss(loss, self.optimizer) as scaled_loss: # 如果使用了APEX工具进行混合精度训练,则需要对损失进行缩放后再进行后向传播
scaled_loss.backward()
else:
self.accelerator.backward(loss) # 使用加速器进行后向传播,适用于没有使用梯度缩放和APEX的情况
# 返回损失值,如果设置了梯度累积步骤,则需要将损失值除以梯度累积步骤数
return loss.detach() / self.args.gradient_accumulation_steps
- compute_loss
计算损失文章来源:https://www.toymoban.com/news/detail-437606.html
# `compute_loss`函数用于计算模型的损失值
def compute_loss(self, model, inputs, return_outputs=False):
# 如果存在标签平滑处理器且输入中有标签,则将标签从输入中移除
if self.label_smoother is not None and "labels" in inputs:
labels = inputs.pop("labels")
else:
labels = None
# 使用模型进行前向传播,得到输出
outputs = model(**inputs)
# 如果存在之前的状态信息,保存它
# TODO: 这部分需要在未来进行清理和优化
if self.args.past_index >= 0:
self._past = outputs[self.args.past_index]
# 如果标签存在,使用标签平滑处理器计算损失
if labels is not None:
if unwrap_model(model)._get_name() in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values():
loss = self.label_smoother(outputs, labels, shift_labels=True)
else:
loss = self.label_smoother(outputs, labels)
else:
# 如果输出是一个字典,但并未包含损失,那么抛出错误
if isinstance(outputs, dict) and "loss" not in outputs:
raise ValueError(
"The model did not return a loss from the inputs, only the following keys: "
f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}."
)
# 我们并未直接使用.outputs,因为模型可能返回的是元组,而非ModelOutput
loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0]
# 如果`return_outputs`为真,返回损失和输出;否则只返回损失
return (loss, outputs) if return_outputs else loss
- func is_local_process_zero
- func is_world_process_zero
- func save_model
此函数用于保存模型。如果给出了输出目录,则将在该目录中保存模型,否则将在args.output_dir中保存模型。保存操作依赖于环境,例如,如果是在TPU上,将会调用`_save_tpu`。如果是在SageMaker多处理中,则会保存模型的状态字典。另外,此函数也考虑了`ShardedDDPOption`的设置等。最后,如果设置了`args.push_to_hub`,那么在用户调用`save_model`时,模型会被推送到Hub - func _save_tpu
在TPU上保存模型的专用函数 - func _save
保存模型的基本函数。这个函数不检查进程是否为零,因为只有在进程为零的情况下才会调用此函数 - func store_flos
存储进入模型的浮点运算数。如果模型在分布式模式下运行,该函数会将当前浮点运算数的总数加到`state.total_flos`上,然后将当前浮点运算数归零。在非分布式模式下,也执行相同的操作,只是不需要分布式广播浮点运算数 - func_sorted_checkpoints
返回排序后的检查点列表。使用修改时间或检查点编号进行排序,然后返回路径列表。如果设置了最佳模型检查点,那么确保我们不会删除最佳模型 - func_rotate_checkpoints
- func evaluate
运行评估并返回指标。需要用户提供计算指标的方法,因为它们是任务依赖的。你也可以重写此方法以注入自定义行为。函数返回包含评估损失和可能从预测中计算出的指标的字典。该字典也包含来自训练状态的epoch编号 - func predict
def predict( self, test_dataset: Dataset, ignore_keys: Optional[List[str]] = None, metric_key_prefix: str = "test" ) -> PredictionOutput: # 设置内存跟踪器,尽早启动 self._memory_tracker.start() # 获取测试数据集的数据加载器 test_dataloader = self.get_test_dataloader(test_dataset) # 记录开始时间 start_time = time.time() # 选择预测循环或评估循环,这取决于args中的use_legacy_prediction_loop参数 eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop # 运行选定的循环,并获得预测或评估输出 output = eval_loop( test_dataloader, description="Prediction", ignore_keys=ignore_keys, metric_key_prefix=metric_key_prefix ) # 计算总批次大小,包括所有的并行处理单元 total_batch_size = self.args.eval_batch_size * self.args.world_size # 如果度量指标中包含jit编译时间,那么将这段时间加到开始时间中 if f"{metric_key_prefix}_jit_compilation_time" in output.metrics: start_time += output.metrics[f"{metric_key_prefix}_jit_compilation_time"] # 更新度量指标,包括预测速度相关的指标 output.metrics.update( speed_metrics( metric_key_prefix, start_time, num_samples=output.num_samples, num_steps=math.ceil(output.num_samples / total_batch_size), ) ) # 使用回调处理器进行预测后的操作,并更新控制状态 self.control = self.callback_handler.on_predict(self.args, self.state, self.control, output.metrics) # 停止内存跟踪器,并更新相关度量指标 self._memory_tracker.stop_and_update_metrics(output.metrics) # 返回预测结果,包括预测值,标签(如果存在)和度量指标 return PredictionOutput(predictions=output.predictions, label_ids=output.label_ids, metrics=output.metrics)
- func evaluation_loop
- func_nested_gather
- func_pad_across_processes
- func prediction_step
- func floating_point_ops
- func init_git_repo
- func create_model_card
- func_push_from_checkpoint
- func push_to_hub
- func prediction_loop
- func_gather_and_numpify
- func_add_sm_patterns_to_gitignore
- func create_accelerator_and_postp
// 待更文章来源地址https://www.toymoban.com/news/detail-437606.html
参考文献与推荐阅读
- Transformer通俗笔记:从Word2Vec、Seq2Seq逐步理解到GPT、BERT
- Transformer原始论文(值得反复读几遍):Attention Is All You Need
- Vision Transformer 超详细解读 (原理分析+代码解读) (一)
- Transformer模型详解(图解最完整版)
- The Annotated Transformer(翻译之一),harvard对transformer的简单编码实现
- transformer的细节到底是怎么样的?
- 如何从浅入深理解transformer?
- Transformer 结构详解:位置编码 | Transformer Architecture: The Positional Encoding
- Transformer学习笔记一:Positional Encoding(位置编码)
- 保姆级讲解Transformer
- Jay Alammar写的图解transformer
- 如何理解attention中的Q,K,V?
附录:创作/修改记录
- 4.12-4.14,基本完成第一部分 transformer编码器部分的初稿
- 4.16,彻底完善关于transformer位置编码的阐述,可能是网上对这点最一目了然的阐述了
- 4.17,完成transformer的解码器部分
- 4.18,开始写「第四部分 ChatGLM-6B的代码架构与逐一实现」
- 5.26,新增内容
分词代码的实现:tokenization_chatglm.py
quantization:模型量化——减小模型大小和推理时间 - 5.27,新增“第五部分 基于LangChain + ChatGLM-6B的本地知识库的应用实现”
- 6.8日,完善第五部分
- 7.5日,把原有的「第四部分 ChatGLM-6B的代码架构与逐一实现」放进另一篇博客里:ChatGLM-6B的基座/部署/微调/实现:从GLM到6B的LoRA/P-Tuning微调、及6B源码解读
把原有的“第五部分 基于LangChain + ChatGLM-6B的本地知识库的应用实现”,独立成文为:给LLM装上知识:从LangChain+LLM的本地知识库问答到LLM与知识图谱的结合 - 7.7-7.20日,写本文新的「第四部分 Hugging face社区实现的Transformers库的整体解读」
到了这里,关于类ChatGPT逐行代码解读(1/2):从零实现Transformer、ChatGLM-6B的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!