引言
本文基于PyTorch实现条件随机场,实现CRF层参考论文Neural Architectures for Named Entity Recognition中关于CRF层的描述。包含大量的图解和例子说明,看完一定能理解!
论文地址: https://arxiv.org/pdf/1603.01360.pdf
也可参考论文笔记: [论文笔记]Neural Architectures for Named Entity Recognition
虽然在尽量控制篇幅,但还是得分成上下两篇:
上一篇: 动手实现条件随机场(上)
实现维特比算法
def decode(
self, emissions: torch.Tensor, mask: torch.ByteTensor = None
) -> list[list[int]]:
"""使用维特比算法找到最有可能的序列
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags), 代表序列中每个单词产生每个标签的得分
mask (torch.ByteTensor | None, optional): 表明哪些元素为填充符。 如果batch_first=False 形状 (seq_len, batch_size) ,否则 形状为 (batch_size, seq_len)
默认为None,表示没有填充符。
Returns:
list[list[int]]: 批次内的最佳标签序列
"""
if mask is None:
mask = torch.ones(emissions.shape[:2], dtype=torch.uint8)
if self.batch_first:
# 转换为seq维度在前的形式
emissions = emissions.transpose(0, 1)
mask = mask.transpose(0, 1)
return self._viterbi(emissions, mask)
首先提供一个对外调用的方法decode
,可以接收批次数据,并且确保接收到的emissions
和mask
的维度是seq_len
在前的。
我们重点来看下_viterbi
方法的实现,即维特比算法。
总体来说维特比算法类似于上面配分函数的计算过程,不过维特比算法不是在每个时间步计算到达所有标签的累计得分,而是计算到达当前时间步某一标签的最佳上一时间步的标签。这里说的最佳是指,使得得分最大的标签。比如:
如图12所示,假设在第1个时间步上到第0个标签,即O
标签的累计得分已知,那么维特比算法选择的就是最大得分,这里是5.5
对应的那条路径,同时会记住该条路径。再加上当前时间步输入产生该标签的得分就得到了当前时间步标签O
的累计得分。
关于维特比算法的详细介绍可以参考文章《统计学习方法》——隐马尔可夫模型,但这里也会通过图解来分析。
第一步是计算由start标签转移到第一个标签的转移得分+产生第一个标签的发射得分,如上图红框框出来的一列:
# 由start到当前时间步所有标签的转移得分 + 批次内所有当前时间步产生所有标签的发射得分
# (num_tags,) + (batch_size, num_tags) -> (batch_size, num_tags)
# score 形状 (batch_size, num_tags) 保存了当前位置,到每个tag的最佳累计得分: 前一累计得分+转移得分+发射得分
score = self.start_transitions + emissions[0]
得到了到当前时间步所有标签的累计得分。假设当前时间步由start到O
的转移得分是3.3,产生O
的发射得分是2.2
,那么累计得分score=5.5
。
接下来定义一个保存目前为止到当前时间步所有标签的最佳候选路径,具体地保存当当前时间步上所有标签的最佳路径的上一个标签。
# 保存了目前为止到当前时间步所有标签的最佳候选路径 最终有seq_len-1个(batch_size, num_tags)的Tensor
history: list[torch.Tensor] = []
但此时我们知道上一个标签一定是start标签,所以不用记录。
下面和上小节一样,进行广播后计算累计得分:
for i in range(1, seq_len):
# broadcast_score: (batch_size, num_tags, 1) = (batch_size, pre_tag, current_tag)
# 所有可能的当前标签current_tag广播
broadcast_score = score.unsqueeze(2)
# 广播成 (batch_size, 1, num_tags)
# shape: (batch_size, 1, num_tags)
broadcast_emissions = emissions[i].unsqueeze(1)
# (batch_size, num_tags, num_tags) = (batch_size, num_tags, 1) + (num_tags, num_tags) + (batch_size, 1, num_tags)
current_score = broadcast_score + self.transitions + broadcast_emissions
不同的地方来了,计算当前时间步完累计得分current_score
后,此时只需要记录最佳路径,比如,对于O
来说,如图14,只需要记录此时间步到O
标签最大转移得分(实际上我们已经累加上了当前标签的发射得分,但对于同一个标签来说,发射得分都是相同的,所以这里描述不冲突)对应的上一时间步的标签,这里上一时间步的标签正好也是O
,整个可以用torch.max
完成,它不仅会返回得分,还会返回对应的索引:
for i in range(1, seq_len):
...
# 计算前一时间步到当前时间步的某个标签的最佳得分
# best_score 形状 (batch_size, num_tags) indices 形状 (batch_size, num_tags)
# indices 相当于 torch.argmax(current_score, dim=1) ,得到产生最大值对应的索引
best_score, indices = torch.max(current_score, dim=1)
上图只画了对于标签O
来说的最佳路径,实际上当前时间步的所有标签都会进行类似的计算。假设该步的计算结果如图15所示,每个标签上都只记录最佳路径:
for i in range(1, seq_len):
...
best_score, indices = torch.max(current_score, dim=1)
# 记录得到最佳得分的前一个索引
history.append(indices)
接下来当然是把最佳路径对应的前一个索引记录下来。如上面代码所示。
当这个for
循环跑完之后,即已经累计到最后一个时间步上所有的标签上了,如下图16所示,下一步是加上转移到stop的转移得分,然后取到end标签的最大得分:
for i in range(1, seq_len):
...
best_score, indices = torch.max(current_score, dim=1)
history.append(indices)
# 加上到end标签的转移得分 end_transitions本身保存的是所有的标签到end标签的得分
# score (batch_size, num_tags)
score += self.end_transitions
# 计算出最后一个时间步到end标签的最大得分 以及对应的索引(tag)
# best_score 形状(batch_size,) indices 形状(batch_size,)
best_score, indices = torch.max(score, dim=1)
对于当前输入来说,此时已经知道最大得分是由哪个标签得到的,最后一步根据history
中保存的信息回溯到第一个时间步即得到整条路径,再逆序变成正向路径:
# 序列最后有效标签的个数
seq_end_tags = mask.long().sum(dim=0) - 1
# 保存需要返回的结果
best_paths: list[list[int]] = []
# 因为批次内每个样本的最后一个有效标签可能不同,因此需要写成for循环
for i in range(batch_size):
best_last_tag = indices[i]
# 通过item()变成普通int
this_path = [best_last_tag.item()]
# history 有 seq_len-1个(batch_size, num_tags), 但是是顺序添加的,history[: seq_end_tags[i]]取有效的history,再逆序
for hist in reversed(history[: seq_end_tags[i]]):
# 先取批次内第i个样本的路径,再取到this_path[-1],即best_last_tag的最佳标签
best_last_tag = hist[i][this_path[-1]]
# 转换为int加入到最佳路径中
this_path.append(best_last_tag.item())
# 这里是通过回溯的方法添加的最佳路径,因此最后还需要reversed逆序,变回顺序的,存入best_tags列表
this_path.reverse()
best_paths.append(this_path)
return best_paths
注意由于可能存在填充,因此要考虑有效标签。seq_end_tags[i]
保存了第i个输入有效标签的个数,只需要取history[: seq_end_tags[i]]
即可,它返回了seq_end_tags[i]
个(batch_size, num_tags)
的Tensor列表。由于history
是正序添加的,这里要取逆序。
this_path
初始化为整个路径的最后一个标签,然后根据它回溯完整个路径。
最后返回的best_paths
是一个列表,列表大小是批次大小,列表中的元素还是一个列表,表示该输入对应的最佳正序标签路径。
最后给出完整代码,有什么问题欢迎留言。文章来源:https://www.toymoban.com/news/detail-503224.html
💡 如果你看完并理解了本文,你会惊醒地发现你理解了pytorch-crf包的源码,因为是大同小异的。文章来源地址https://www.toymoban.com/news/detail-503224.html
完整代码
# CRF Model
class CRF(nn.Module):
"""
条件随机场的实现。
使用前向-后向算法计算输入的对数似然。参考论文 Neural Architectures for Named Entity Recognition 。
基于Python3.10.9和torch-1.13.1
"""
def __init__(
self,
num_tags: int,
batch_first: bool = False,
) -> None:
"""初始化CRF的参数
Args:
num_tags (int): 标签数量
batch_first (bool, optional): 是否batch维度在前,默认为False
"""
super().__init__()
self.num_tags = num_tags
self.batch_first = batch_first
# 转移分数(A),表示两个标签之间转移的得分
# transitions[i,j] 表示由第i个标签转移到第j个标签的得分(可以理解为可能性/置信度)
self.transitions = nn.Parameter(torch.Tensor(num_tags, num_tags))
# 新引入了两个状态:start和end
# 从start状态开始转移的分数
self.start_transitions = nn.Parameter(torch.Tensor(num_tags))
# 转移到end状态的分数
self.end_transitions = nn.Parameter(torch.Tensor(num_tags))
self.reset_parameters()
def __repr__(self):
return f"CRF(num_tags={self.num_tags})"
def reset_parameters(self) -> None:
nn.init.uniform_(self.transitions, -0.1, 0.1)
nn.init.uniform_(self.start_transitions, -0.1, 0.1)
nn.init.uniform_(self.end_transitions, -0.1, 0.1)
def forward(
self,
emissions: torch.Tensor,
tags: torch.LongTensor,
mask: torch.ByteTensor | None = None,
reduction: str = 'sum'
) -> torch.Tensor:
"""计算给定的标签序列tags的负对数似然
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags), 代表序列中每个单词产生每个标签的得分
tags (torch.LongTensor): 标签序列 如果batch_first=False 形状 (seq_len, batch_size) ,否则 形状为 (batch_size, seq_len)
mask (torch.ByteTensor | None, optional): 表明哪些元素为填充符,和tags的形状一致。 如果batch_first=False 形状 (seq_len, batch_size) ,否则 形状为 (batch_size, seq_len)
默认为None,表示没有填充符。
reduction (str): 汇聚函数: none|sum|mean|token_mean 。 none:不应用汇聚函数;
Returns:
torch.Tensor: 输入tags的负对数似然
"""
if mask is None:
# mask 取值为0或1,这里1表示有效标签,默认都为有效标签
mask = torch.ones_like(tags)
if self.batch_first:
# 转换为seq维度在前的形式
emissions = emissions.transpose(0, 1)
tags = tags.transpose(0, 1)
mask = mask.transpose(0, 1)
# 计算标签序列tags的得分
score = self._compute_score(emissions, tags, mask)
# 计算划分函数 partition Z(x)
partition = self._compute_partition(emissions, mask)
# negative log likelihood
nllh = partition - score
if reduction == 'none':
return nllh
if reduction == 'sum':
return nllh.sum()
if reduction == 'mean':
return nllh.mean()
# 否则为 'token_mean'
return nllh.sum() / mask.type_as(emissions).sum()
def _compute_score(
self,
emissions: torch.Tensor,
tags: torch.LongTensor,
mask: torch.ByteTensor,
) -> torch.Tensor:
"""计算标签序列tags的得分
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags)
tags (torch.LongTensor): 标签序列 形状 (seq_len, batch_size)
mask (torch.ByteTensor): 表明哪些元素为填充符 形状 (seq_len, batch_size)
Returns:
torch.Tensor: 批次内标签tags的得分, 形状(batch_size,)
"""
seq_len, batch_size = tags.shape
# first_tags (batch_size,)
first_tags = tags[0]
# 由start标签转移到批次内所有标签序列第一个标签的得分
score = self.start_transitions[first_tags]
# 加上 批次内第一个(index=0)发射得分,即批次内第0个输入产生批次内对应第0个标签的得分
score += emissions[0, torch.arange(batch_size), first_tags]
mask = mask.type_as(emissions) # 类型保持一致
# 这里的index从1开始,也就是第二个位置开始
for i in range(1, seq_len):
# 第i-1个标签转移到第i个标签的得分 + 第i个单词产生第i个标签的得分
# 乘以mask[i]不需要计算填充单词的得分
# score 形状(batch_size,)
score += (
self.transitions[tags[i - 1], tags[i]]
+ emissions[i, torch.arange(batch_size), tags[i]]
) * mask[i]
# last_tags = tags[-1] × 这是错误的!,因为可能包含填充单词
valid_last_idx = mask.long().sum(dim=0) - 1 # 有效的最后一个索引
last_tags = tags[valid_last_idx, torch.arange(batch_size)]
# 最后加上最后一个标签转移到end标签的转移得分
score += self.end_transitions[last_tags]
return score
def _compute_partition(
self, emissions: torch.Tensor, mask: torch.ByteTensor
) -> torch.Tensor:
"""利用CRF的前向算法计算partition的分数
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags)
mask (torch.ByteTensor): 表明哪些元素为填充符 (seq_len, batch_size)
Returns:
torch.Tensor: 批次内的partition分数 形状(batch_size,)
"""
seq_len = emissions.shape[0]
# score (batch_size, num_tags) 对于每个批次来说,第i个元素保存到目前为止以i结尾的所有可能序列的得分
score = self.start_transitions.unsqueeze(0) + emissions[0]
for i in range(1, seq_len):
# broadcast_score: (batch_size, num_tags, 1) = (batch_size, pre_tag, current_tag)
# 所有可能的当前标签current_tag广播
broadcast_score = score.unsqueeze(2)
# 广播成 (batch_size, 1, num_tags)
# shape: (batch_size, 1, num_tags)
broadcast_emissions = emissions[i].unsqueeze(1)
# (batch_size, num_tags, num_tags) = (batch_size, num_tags, 1) + (num_tags, num_tags) + (batch_size, 1, num_tags)
current_score = broadcast_score + self.transitions + broadcast_emissions
# 在前一时间步标签上求和 -> (batch_size, num_tags)
# 对于每个批次来说,第i个元素保存到目前为止以i结尾的所有可能标签序列的得分
current_score = torch.logsumexp(current_score, dim=1)
# mask[i].unsqueeze(1) -> (batch_size, 1)
# 只有mask[i]是有效标签的current_score才将值设置到score中,否则保持原来的score
score = torch.where(mask[i].bool().unsqueeze(1), current_score, score)
# 加上到end标签的转移得分 end_transitions本身保存的是所有的标签到end标签的得分
# score (batch_size, num_tags)
score += self.end_transitions
# 在所有的标签上求(logsumexp)和
# return (batch_size,)
return torch.logsumexp(score, dim=1)
def decode(
self, emissions: torch.Tensor, mask: torch.ByteTensor = None
) -> list[list[int]]:
"""使用维特比算法找到最有可能的序列
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags), 代表序列中每个单词产生每个标签的得分
mask (torch.ByteTensor | None, optional): 表明哪些元素为填充符。 如果batch_first=False 形状 (seq_len, batch_size) ,否则 形状为 (batch_size, seq_len)
默认为None,表示没有填充符。
Returns:
list[list[int]]: 批次内的最佳标签序列
"""
if mask is None:
mask = torch.ones(emissions.shape[:2], dtype=torch.uint8, device=emissions.device)
if self.batch_first:
# 转换为seq维度在前的形式
emissions = emissions.transpose(0, 1)
mask = mask.transpose(0, 1)
return self._viterbi(emissions, mask)
def _viterbi(
self, emissions: torch.Tensor, mask: torch.ByteTensor
) -> list[list[int]]:
"""维特比算法的实现
Args:
emissions (torch.Tensor): 发射分数P 形状 (seq_len, batch_size, num_tags)
mask (torch.ByteTensor): 表明哪些元素为填充符 形状 (seq_len, batch_size)
Returns:
list[list[int]]: 批次内的最佳标签序列
"""
seq_len, batch_size = mask.shape
# 由start到当前时间步所有标签的转移得分 + 批次内所有当前时间步产生所有标签的发射得分
# (num_tags,) + (batch_size, num_tags) -> (batch_size, num_tags)
# score 形状 (batch_size, num_tags) 保存了当前位置,到每个tag的最佳累计得分: 前一累计得分+转移得分+发射得分
score = self.start_transitions + emissions[0]
# 保存了目前为止到当前时间步所有标签的最佳候选路径 最终有seq_len-1个(batch_size, num_tags)的Tensor
history: list[torch.Tensor] = []
for i in range(1, seq_len):
# broadcast_score: (batch_size, num_tags, 1) = (batch_size, pre_tag, current_tag)
# 所有可能的当前标签current_tag广播
broadcast_score = score.unsqueeze(2)
# 广播成 (batch_size, 1, num_tags)
# shape: (batch_size, 1, num_tags)
broadcast_emissions = emissions[i].unsqueeze(1)
# (batch_size, num_tags, num_tags) = (batch_size, num_tags, 1) + (num_tags, num_tags) + (batch_size, 1, num_tags)
current_score = broadcast_score + self.transitions + broadcast_emissions
# 计算前一时间步到当前时间步的某个标签的最佳得分
# best_score 形状 (batch_size, num_tags) indices 形状 (batch_size, num_tags)
# indices 相当于 torch.argmax(current_score, dim=1) ,得到产生最大值对应的索引
best_score, indices = torch.max(current_score, dim=1)
# mask[i].unsqueeze(1) -> (batch_size, 1)
# 只有mask[i]是有效标签的best_score才将值设置到score中,否则保持原来的score
score = torch.where(mask[i].bool().unsqueeze(1), best_score, score)
# 记录得到最佳得分的前一个索引
history.append(indices)
# 加上到end标签的转移得分 end_transitions本身保存的是所有的标签到end标签的得分
# score (batch_size, num_tags)
score += self.end_transitions
# 计算出最后一个时间步到end标签的最大得分 以及对应的索引(tag)
# best_score 形状(batch_size,) indices 形状(batch_size,)
best_score, indices = torch.max(score, dim=1)
# 序列最后有效标签的个数
seq_end_tags = mask.long().sum(dim=0) - 1
# 保存需要返回的结果
best_paths: list[list[int]] = []
# 因为批次内每个样本的最后一个有效标签可能不同,因此需要写成for循环
for i in range(batch_size):
best_last_tag = indices[i]
# 通过item()变成普通int
this_path = [best_last_tag.item()]
# history 有 seq_len-1个(batch_size, num_tags), 但是是顺序添加的,history[: seq_end_tags[i]]取有效的history,再逆序
for hist in reversed(history[: seq_end_tags[i]]):
# 先取批次内第i个样本的路径,再取到this_path[-1],即best_last_tag的最佳标签
best_last_tag = hist[i][this_path[-1]]
# 转换为int加入到最佳路径中
this_path.append(best_last_tag.item())
# 这里是通过回溯的方法添加的最佳路径,因此最后还需要reversed逆序,变回顺序的,存入best_tags列表
this_path.reverse()
best_paths.append(this_path)
return best_paths
参考
- 统计学习方法——条件随机场
- pytorch-crf
- ADVANCED: MAKING DYNAMIC DECISIONS AND THE BI-LSTM CR
- 《统计学习方法》——隐马尔可夫模型
到了这里,关于动手实现条件随机场(下)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!