大模型 Transformer模型(下)
六、Encoder 编码器编码器模块是 Transformer 的输入处理核心,它由词嵌入层、位置编码层以及多个相同结构的编码器子层组成,其作用是词嵌入层负责将离散的 token 映射为连续的向量表示;位置编码用于引入位置信息,使得模型在没有循环结构的前提下仍然能捕捉序列顺序。每个编码器子层内部包括一个多头自注意力机制(用于提取序列中不同位置之间的依赖关系)和一个前馈神经网络(用于增强每个位置的表
目录
接上篇,这篇主要讲一下Transformer 框架搭建流程的后半段,先放一个苯人总结的总流程:
Dataprocess 数据处理 --》Position位置编码 --》Mask掩码 --》MHA多头注意力机制 --》FFN前馈神经网络 --》Encoder编码器 --》Decoder解码器 --》Transformer 模型构建 --》模型训练 --》模型预测
六、Encoder 编码器
编码器模块是 Transformer 的输入处理核心,它由词嵌入层、位置编码层以及多个相同结构的编码器子层组成,其作用是对输入序列进行深层次的特征提取和表示学习。
词嵌入层负责将离散的 token 映射为连续的向量表示;位置编码用于引入位置信息,使得模型在没有循环结构的前提下仍然能捕捉序列顺序。每个编码器子层内部包括一个多头自注意力机制(用于提取序列中不同位置之间的依赖关系)和一个前馈神经网络(用于增强每个位置的表达能力)。在子层间还包含残差连接和层归一化操作,进一步稳定训练过程并提升模型效果。
实现代码如下:
import torch
from torch import nn
from FFN import FFN
from MHA import MultiHeadAttention
from Mask import att_pad_mask
from position import PositionalEncoding
# 编码器 结合词嵌入 位置编码 编码器子层
class Encoder(nn.Module):
def __init__(self,vab_size,d_model,n_layers,num_heads,d_ff):
super(Encoder,self).__init__()
# 1.词嵌入
self.embedding = nn.Embedding(vab_size,d_model)
# 2.位置编码
self.position_encoding = PositionalEncoding(d_model)
# 3. 编码器子层
self.encoder_layers = nn.ModuleList([
EncoderLayer(d_model,num_heads,d_ff) for _ in range(n_layers)
])
def forward(self,enc_inputs):
"""
:param enc_inputs: 数据处理好之后结果 【batch,seq】
:return:
"""
# 词嵌入
enc_outputs = self.embedding(enc_inputs)
# 位置编码
enc_outputs = self.position_encoding(enc_outputs)
# 生成掩码
mask = att_pad_mask(enc_inputs,enc_inputs)
for encoder_layer in self.encoder_layers:
enc_outputs = encoder_layer(enc_outputs,mask)
return enc_outputs
# 组装每个子层 MHA + FFN
class EncoderLayer(nn.Module):
def __init__(self,d_model,num_heads,d_ff):
super(EncoderLayer,self).__init__()
self.enc_self_attn = MultiHeadAttention(d_model,num_heads)
self.pos_ffn = FFN(d_model,d_ff)
def forward(self,enc_inputs,mask=None):
enc_outputs = self.enc_self_attn(enc_inputs,enc_inputs,mask)
enc_outputs = self.pos_ffn(enc_outputs)
return enc_outputs
if __name__ == '__main__':
enc_inputs = torch.randint(0,10,(2,5))
enc_outputs = Encoder(100,8,2,2,16)(enc_inputs)
print(enc_outputs.shape)
这里其实编码器子层应该放在前面,后面解码器是这样的顺序
七、Decoder 解码器
Decoder 把“已生成的目标序列(真实值)”(dec_inputs)和 Encoder 的输出(enc_outputs)结合起来,逐步生成下一步预测,整体结构为:
词嵌入 --》位置编码 --》生成掩码 --》堆叠多个decoder_layer --》输出
其中堆叠的每个 DecoderLayer 的内部顺序是:masked self-attention → cross-attention → FFN.
实现代码如下:
import torch
from torch import nn
from FFN import FFN
from MHA import MultiHeadAttention
from Mask import att_pad_mask, att_sub_mask
from position import PositionalEncoding
# 组装三个子层
class DecoderLayer(nn.Module):
def __init__(self,d_model,num_heads,d_ff):
super().__init__()
# 子层 MHA
self.dec_self_attn = MultiHeadAttention(d_model,num_heads)
# cross 子层
self.enc_dec_attn = MultiHeadAttention(d_model,num_heads)
# FFN 子层
self.dec_ffn = FFN(d_model,d_ff)
def forward(self,dec_inputs,enc_outputs,self_mask=None,cross_mask=None):
# M-MHA
dec_output = self.dec_self_attn(dec_inputs,dec_inputs,self_mask)
# cross MHA
dec_output = self.enc_dec_attn(enc_outputs,dec_output,cross_mask)
# FFN
dec_output = self.dec_ffn(dec_output)
return dec_output
# 组装 Decoder 包括 词嵌入 位置编码 解码器子层
class Decoder(nn.Module):
def __init__(self,vab_size,d_model,n_layers,num_heads,d_ff):
super().__init__()
self.embedding = nn.Embedding(vab_size,d_model)
self.position_encoding = PositionalEncoding(d_model)
self.decoder_layers = nn.ModuleList([
DecoderLayer(d_model,num_heads,d_ff) for _ in range(n_layers)
])
def forward(self,dec_inputs,enc_inputs,enc_outputs):
"""
:param dec_inputs: 数据处理之后的结果 【batch,seq】
:param enc_outputs: 编码器处理之后的结果 【batch,seq,dm】
:param self_mask: pad+sub
:param context_mask: pad
:return:
"""
emd = self.embedding(dec_inputs)
decoder_outputs = self.position_encoding(emd)
# 生成掩码
self_pad_mask = att_pad_mask(dec_inputs,dec_inputs) #填充掩码
self_sub_mask = att_sub_mask(dec_inputs) #未来掩码
self_mask = torch.gt(self_pad_mask + self_sub_mask,0)
cross_pad_mask = att_pad_mask(enc_inputs,dec_inputs)
#堆叠多个decoderlayer,每一层依次执行MHA + crossMHA + FFN
for decoder_layer in self.decoder_layers:
decoder_outputs = decoder_layer(decoder_outputs,enc_outputs,self_mask,cross_pad_mask)
return decoder_outputs
if __name__ == '__main__':
enc_inputs = torch.randint(0,10,(2,5))
dec_inputs = torch.randint(0,10,(2,5))
enc_outputs = torch.randn(2,5,16)
decoder_outputs = Decoder(100,16,2,2,16)(dec_inputs,enc_inputs,enc_outputs)
print(decoder_outputs.shape)
这样一层层堆起来,就得到了一个能“既参考自己历史,又借助外部信息”的强大解码器。
八、Transformer 模型构建
现在编码器和解码器都有了就可以搭建 transformer 模型了,苯人总结的搭建流程如下:
准备编码器和解码器(接入之前写好的两个类)--》加入 projection层 --》连接数据流(前向传播)
代码如下:
from torch import nn
from day03.decoder import Decoder
from day03.encoder import Encoder
class Transformer(nn.Module):
def __init__(self, src_vocab,tgt_vocab_size, d_model,n_layers,num_heads,d_ff):
super(Transformer, self).__init__()
#直接从编码器解码器导入
self.encoder = Encoder(src_vocab, d_model,n_layers,num_heads,d_ff)
self.decoder = Decoder(tgt_vocab_size,d_model,n_layers,num_heads,d_ff)
#projection要把 d_model 映射到 目标词表的 logits(d_model -> vocab)
self.projection = nn.Linear(d_model, tgt_vocab_size)
def forward(self,src_seq, tgt_seq ):
'''
:param src_seq: 原始文本的序列
:param tgt_seq: 目标译文的序列
:return:
'''
encoder_outputs = self.encoder(src_seq) #输入经过编码器得到编码器输出
decoder_outputs = self.decoder(tgt_seq, src_seq, encoder_outputs)
outputs = self.projection(decoder_outputs) # 投影到词表维度
return outputs
这里就不进行调试了
九、模型训练
有了网络模型后就可以开始训练了:
from torch import nn, optim
from transformer import Transformer
from data_deal import *
# vocab_size, d_model, d_ff, n_heads, n_layers
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#实例化模型
model = Transformer(src_vocab_size,tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)
# 损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
# momentum 参数指的是动量(Momentum)因子。动量是一种加速梯度下降算法收敛的技术
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)
# 数据拿去并且训练
for epoch in range(100):
model.train()
for idx, (enc_inputs, dec_inputs, dec_labels) in enumerate(loader):
enc_inputs, dec_inputs, dec_labels = enc_inputs.to(device), dec_inputs.to(device), dec_labels.to(device)
optimizer.zero_grad()
outputs = model(enc_inputs, dec_inputs)
loss = criterion(outputs, dec_labels.view(-1))
loss.backward()
optimizer.step()
# 显示损失结果
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss.item()))
torch.save(model.state_dict(),'transformer.pth')
训练的逻辑这里就不再多讲,可以看我之前发的深度学习的笔记,跟那个逻辑是一样的,运行结果也不贴了(因为没截图,而且俺们是cpu
那当然也可以进行模型验证,只需要在保存参数前加一个验证代码类似于:
model.eval()
# val_loss = 0.0
# with torch.no_grad():
# for i, (enc_v, dec_v_in, dec_v_out) in enumerate(val_loader):
# ...
# 保存 checkpoint(基于验证或周期)
可以每个epoch或者每10个epoch做一次验证都可以,然后取验证准确率最高的模型参数保存
十、模型预测
这里我们用模型进行列生成的“贪婪解码”预测流程,比如给定一个输入序列(比如一句话的编码),模型一个词一个词地生成对应的目标序列(比如翻译结果),直到遇到终止符:
from transformer import Transformer
from data_deal import *
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#实例化模型
model = Transformer(src_vocab_size,tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)
# 加载模型权重
model.load_state_dict(torch.load('transformer.pth',weights_only=False))
model.eval() # 切换到评估模式
def greedy_decoder(model, enc_input, start_symbol):
"""
为了简化,贪婪解码器在 K=1 时相当于束搜索。这在推理中是必要的,因为我们不知道目标序列输入。因此我们尝试逐字生成目标输入,然后将其输入到 Transformer 中。
起始参考:http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
:param model: Transformer 模型
:param enc_input: 编码器输入
:param start_symbol: 起始符号。在这个例子中,它是 'S',对应的索引是 6
:return: 目标输入
"""
# 编码输入序列
enc_outputs = model.encoder(enc_input)
# 初始化解码器输入,使用与 enc_input 数据类型相同的空张量 形状为(1,0)
dec_input = torch.zeros(1, 0).type_as(enc_input.data)
# 初始化终止标志为 False
terminal = False
# 初始化下一个符号为起始符号
next_symbol = start_symbol
# 开始解码循环,直到终止标志为 True
while not terminal:
# 将下一个符号添加到解码器输入中
# .detach() 返回一个新的张量,该张量从当前计算图中分离出来,因此不会对其应用反向传播。
dec_input = torch.cat([dec_input.detach(), torch.tensor([[next_symbol]], dtype=enc_input.dtype).cuda()], -1)
# print(dec_input)
# print(dec_input.shape)
# 通过解码器获取解码输出
dec_outputs= model.decoder(enc_input, enc_outputs,dec_input)
# 将解码输出投影到词汇表维度,得到词概率
projected = model.projection(dec_outputs)
# 选择具有最大概率的词
prob = projected.squeeze(0).max(dim=-1)[1]
# 获取下一个词作为下一步输出
next_word = prob[-1].item() # 使用 .item() 获取 Python 数值
# 更新下一个符号为下一个词
next_symbol = next_word
# 如果下一个符号是句子的终止符号,则设置终止标志为 True
if next_symbol == tgt_vocab["E"]:
terminal = True
# # 打印下一个词
# print(next_word)
# 返回生成的解码器输入
return dec_input
# 测试部分
enc_inputs, _, _ = next(iter(loader))
enc_inputs = enc_inputs.cuda()
for i in range(len(enc_inputs)):
#遍历每个输入句子,调用 greedy_decoder 得到解码器的预测输入
greedy_dec_input = greedy_decoder(model, enc_inputs[i].view(1, -1), start_symbol=tgt_vocab["S"])
# print(greedy_dec_input)
#用模型的完整 forward 计算预测结果(对照 enc_inputs 和解码器输入得到最终预测分布)
predict = model(enc_inputs[i].view(1, -1), greedy_dec_input)
predict = predict.view(-1, predict.size(-1))
predict = predict.max(1)[1] #用最大概率词的索引生成预测序列。
# print(predict)
#打印源句子(通过索引找到单词)和预测结果
print([src_idx2word[word.item()] for word in enc_inputs[i]], '->', [idx2word[n.item()] for n in predict])
值得注意的是,贪婪解码(greedy decoding)每一步只挑最有可能的词,速度快但有时会“短视”,比如生成的句子不够多样或自然
这篇就到此结束,其实写得有点仓促,因为现在我有其他任务但是实在不想再拖这篇,以上有问题可以指出 (๑•̀ㅂ•́)و✧
更多推荐
所有评论(0)