目录

六、Encoder 编码器

七、Decoder 解码器

八、Transformer 模型构建

九、模型训练

十、模型预测


接上篇,这篇主要讲一下Transformer 框架搭建流程的后半段,先放一个苯人总结的总流程:

Dataprocess 数据处理 --》Position位置编码 --》Mask掩码 --》MHA多头注意力机制 --》FFN前馈神经网络 --》Encoder编码器 --》Decoder解码器 --》Transformer 模型构建 --》模型训练 --》模型预测

六、Encoder 编码器

  编码器模块是 Transformer 的输入处理核心,它由词嵌入层、位置编码层以及多个相同结构的编码器子层组成,其作用是对输入序列进行深层次的特征提取和表示学习。

  词嵌入层负责将离散的 token 映射为连续的向量表示;位置编码用于引入位置信息,使得模型在没有循环结构的前提下仍然能捕捉序列顺序。每个编码器子层内部包括一个多头自注意力机制(用于提取序列中不同位置之间的依赖关系)和一个前馈神经网络(用于增强每个位置的表达能力)。在子层间还包含残差连接和层归一化操作,进一步稳定训练过程并提升模型效果。

实现代码如下:

import torch
from torch import nn

from FFN import FFN
from MHA import MultiHeadAttention
from Mask import att_pad_mask
from position import PositionalEncoding


# 编码器 结合词嵌入 位置编码 编码器子层
class Encoder(nn.Module):
    def __init__(self,vab_size,d_model,n_layers,num_heads,d_ff):
        super(Encoder,self).__init__()
        # 1.词嵌入
        self.embedding = nn.Embedding(vab_size,d_model)
        # 2.位置编码
        self.position_encoding = PositionalEncoding(d_model)
        # 3. 编码器子层
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model,num_heads,d_ff) for _ in range(n_layers)
        ])

    def forward(self,enc_inputs):
        """
        :param enc_inputs: 数据处理好之后结果 【batch,seq】
        :return:
        """
        # 词嵌入
        enc_outputs = self.embedding(enc_inputs)
        # 位置编码
        enc_outputs = self.position_encoding(enc_outputs)

        # 生成掩码
        mask = att_pad_mask(enc_inputs,enc_inputs)

        for encoder_layer in self.encoder_layers:
            enc_outputs = encoder_layer(enc_outputs,mask)

        return enc_outputs

# 组装每个子层 MHA + FFN
class EncoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,d_ff):
        super(EncoderLayer,self).__init__()
        self.enc_self_attn = MultiHeadAttention(d_model,num_heads)
        self.pos_ffn = FFN(d_model,d_ff)
    def forward(self,enc_inputs,mask=None):
        enc_outputs = self.enc_self_attn(enc_inputs,enc_inputs,mask)
        enc_outputs = self.pos_ffn(enc_outputs)

        return enc_outputs

if __name__ == '__main__':
    enc_inputs = torch.randint(0,10,(2,5))
    enc_outputs = Encoder(100,8,2,2,16)(enc_inputs)
    print(enc_outputs.shape)

这里其实编码器子层应该放在前面,后面解码器是这样的顺序

七、Decoder 解码器

  Decoder 把“已生成的目标序列(真实值)”(dec_inputs)和 Encoder 的输出(enc_outputs)结合起来,逐步生成下一步预测,整体结构为:

词嵌入 --》位置编码 --》生成掩码 --》堆叠多个decoder_layer --》输出

其中堆叠的每个 DecoderLayer 的内部顺序是:masked self-attention → cross-attention → FFN.

实现代码如下:

import torch
from torch import nn

from FFN import FFN
from MHA import MultiHeadAttention
from Mask import att_pad_mask, att_sub_mask
from position import PositionalEncoding


# 组装三个子层
class DecoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,d_ff):
        super().__init__()
        # 子层 MHA
        self.dec_self_attn = MultiHeadAttention(d_model,num_heads)
        # cross 子层
        self.enc_dec_attn = MultiHeadAttention(d_model,num_heads)
        # FFN 子层
        self.dec_ffn = FFN(d_model,d_ff)

    def forward(self,dec_inputs,enc_outputs,self_mask=None,cross_mask=None):
        # M-MHA
        dec_output = self.dec_self_attn(dec_inputs,dec_inputs,self_mask)
        # cross MHA
        dec_output = self.enc_dec_attn(enc_outputs,dec_output,cross_mask)
        # FFN
        dec_output = self.dec_ffn(dec_output)

        return dec_output

# 组装 Decoder 包括 词嵌入 位置编码 解码器子层
class Decoder(nn.Module):
    def __init__(self,vab_size,d_model,n_layers,num_heads,d_ff):
        super().__init__()
        self.embedding = nn.Embedding(vab_size,d_model)
        self.position_encoding = PositionalEncoding(d_model)
        self.decoder_layers = nn.ModuleList([
            DecoderLayer(d_model,num_heads,d_ff) for _ in range(n_layers)
        ])
    def forward(self,dec_inputs,enc_inputs,enc_outputs):
        """
        :param dec_inputs: 数据处理之后的结果 【batch,seq】
        :param enc_outputs: 编码器处理之后的结果 【batch,seq,dm】
        :param self_mask: pad+sub
        :param context_mask: pad
        :return:
        """
        emd = self.embedding(dec_inputs)
        decoder_outputs = self.position_encoding(emd)

        # 生成掩码
        self_pad_mask = att_pad_mask(dec_inputs,dec_inputs) #填充掩码
        self_sub_mask = att_sub_mask(dec_inputs) #未来掩码
        self_mask = torch.gt(self_pad_mask + self_sub_mask,0)

        cross_pad_mask = att_pad_mask(enc_inputs,dec_inputs)

        #堆叠多个decoderlayer,每一层依次执行MHA + crossMHA + FFN
        for decoder_layer in self.decoder_layers:
            decoder_outputs = decoder_layer(decoder_outputs,enc_outputs,self_mask,cross_pad_mask)

        return decoder_outputs

if __name__ == '__main__':
    enc_inputs = torch.randint(0,10,(2,5))
    dec_inputs = torch.randint(0,10,(2,5))
    enc_outputs = torch.randn(2,5,16)
    decoder_outputs = Decoder(100,16,2,2,16)(dec_inputs,enc_inputs,enc_outputs)
    print(decoder_outputs.shape)

这样一层层堆起来,就得到了一个能“既参考自己历史,又借助外部信息”的强大解码器。

八、Transformer 模型构建

 现在编码器和解码器都有了就可以搭建 transformer 模型了,苯人总结的搭建流程如下:

准备编码器和解码器(接入之前写好的两个类)--》加入 projection层 --》连接数据流(前向传播)

代码如下:

from torch import nn

from day03.decoder import Decoder
from day03.encoder import Encoder


class Transformer(nn.Module):
    def __init__(self, src_vocab,tgt_vocab_size, d_model,n_layers,num_heads,d_ff):
        super(Transformer, self).__init__()
        #直接从编码器解码器导入
        self.encoder = Encoder(src_vocab, d_model,n_layers,num_heads,d_ff)
        self.decoder = Decoder(tgt_vocab_size,d_model,n_layers,num_heads,d_ff)

        #projection要把 d_model 映射到 目标词表的 logits(d_model -> vocab)
        self.projection = nn.Linear(d_model, tgt_vocab_size)


    def forward(self,src_seq, tgt_seq ):
        '''
        :param src_seq: 原始文本的序列
        :param tgt_seq: 目标译文的序列
        :return:
        '''
        encoder_outputs = self.encoder(src_seq) #输入经过编码器得到编码器输出
        decoder_outputs = self.decoder(tgt_seq, src_seq, encoder_outputs)
        outputs = self.projection(decoder_outputs) # 投影到词表维度
        return outputs

这里就不进行调试了

九、模型训练

有了网络模型后就可以开始训练了:

from torch import nn, optim

from transformer import Transformer
from data_deal import *
#  vocab_size, d_model, d_ff, n_heads, n_layers

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#实例化模型
model = Transformer(src_vocab_size,tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=0)
# momentum 参数指的是动量(Momentum)因子。动量是一种加速梯度下降算法收敛的技术
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)

# 数据拿去并且训练
for epoch in range(100):
    model.train()
    for idx, (enc_inputs, dec_inputs, dec_labels) in enumerate(loader):
        enc_inputs, dec_inputs, dec_labels = enc_inputs.to(device), dec_inputs.to(device), dec_labels.to(device)

        optimizer.zero_grad()

        outputs = model(enc_inputs, dec_inputs)
        loss = criterion(outputs, dec_labels.view(-1))

        loss.backward()
        optimizer.step()

    # 显示损失结果
    print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss.item()))


torch.save(model.state_dict(),'transformer.pth')

训练的逻辑这里就不再多讲,可以看我之前发的深度学习的笔记,跟那个逻辑是一样的,运行结果也不贴了(因为没截图,而且俺们是cpu

那当然也可以进行模型验证,只需要在保存参数前加一个验证代码类似于:

     model.eval()
    # val_loss = 0.0
    # with torch.no_grad():
    #     for i, (enc_v, dec_v_in, dec_v_out) in enumerate(val_loader):
    #         ...
    # 保存 checkpoint(基于验证或周期)

可以每个epoch或者每10个epoch做一次验证都可以,然后取验证准确率最高的模型参数保存

十、模型预测

这里我们用模型进行列生成的“贪婪解码”预测流程,比如给定一个输入序列(比如一句话的编码),模型一个词一个词地生成对应的目标序列(比如翻译结果),直到遇到终止符:

from transformer import Transformer
from data_deal import *

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#实例化模型
model = Transformer(src_vocab_size,tgt_vocab_size, d_model, d_ff, n_heads, n_layers).to(device)

# 加载模型权重
model.load_state_dict(torch.load('transformer.pth',weights_only=False))
model.eval()  # 切换到评估模式


def greedy_decoder(model, enc_input, start_symbol):
    """
    为了简化,贪婪解码器在 K=1 时相当于束搜索。这在推理中是必要的,因为我们不知道目标序列输入。因此我们尝试逐字生成目标输入,然后将其输入到 Transformer 中。
    起始参考:http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
    :param model: Transformer 模型
    :param enc_input: 编码器输入
    :param start_symbol: 起始符号。在这个例子中,它是 'S',对应的索引是 6
    :return: 目标输入
    """

    # 编码输入序列
    enc_outputs = model.encoder(enc_input)

    # 初始化解码器输入,使用与 enc_input 数据类型相同的空张量 形状为(1,0)
    dec_input = torch.zeros(1, 0).type_as(enc_input.data)
    # 初始化终止标志为 False
    terminal = False

    # 初始化下一个符号为起始符号
    next_symbol = start_symbol

    # 开始解码循环,直到终止标志为 True
    while not terminal:
        # 将下一个符号添加到解码器输入中
        # .detach() 返回一个新的张量,该张量从当前计算图中分离出来,因此不会对其应用反向传播。
        dec_input = torch.cat([dec_input.detach(), torch.tensor([[next_symbol]], dtype=enc_input.dtype).cuda()], -1)
        # print(dec_input)
        # print(dec_input.shape)

        # 通过解码器获取解码输出
        dec_outputs= model.decoder(enc_input, enc_outputs,dec_input)

        # 将解码输出投影到词汇表维度,得到词概率
        projected = model.projection(dec_outputs)

        # 选择具有最大概率的词
        prob = projected.squeeze(0).max(dim=-1)[1]
        # 获取下一个词作为下一步输出
        next_word = prob[-1].item()  # 使用 .item() 获取 Python 数值
        # 更新下一个符号为下一个词
        next_symbol = next_word

        # 如果下一个符号是句子的终止符号,则设置终止标志为 True
        if next_symbol == tgt_vocab["E"]:
            terminal = True

        # # 打印下一个词
        # print(next_word)

    # 返回生成的解码器输入
    return dec_input


# 测试部分
enc_inputs, _, _ = next(iter(loader))
enc_inputs = enc_inputs.cuda()
for i in range(len(enc_inputs)):
    #遍历每个输入句子,调用 greedy_decoder 得到解码器的预测输入
    greedy_dec_input = greedy_decoder(model, enc_inputs[i].view(1, -1), start_symbol=tgt_vocab["S"])
    # print(greedy_dec_input)

    #用模型的完整 forward 计算预测结果(对照 enc_inputs 和解码器输入得到最终预测分布)
    predict = model(enc_inputs[i].view(1, -1), greedy_dec_input)
    predict = predict.view(-1, predict.size(-1))
    predict = predict.max(1)[1] #用最大概率词的索引生成预测序列。
    # print(predict)
    #打印源句子(通过索引找到单词)和预测结果
    print([src_idx2word[word.item()] for word in enc_inputs[i]], '->', [idx2word[n.item()] for n in predict])

值得注意的是,贪婪解码(greedy decoding)每一步只挑最有可能的词,速度快但有时会“短视”,比如生成的句子不够多样或自然

这篇就到此结束,其实写得有点仓促,因为现在我有其他任务但是实在不想再拖这篇,以上有问题可以指出 (๑•̀ㅂ•́)و✧

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐