2.1 transformer模型原理及代码(python)
本文参考了其他一些博客,在这里作了一个汇总,参考最多的是博客、博客和博客,也看了其他博客,在这里不详细的说了,作为笔记用。
本文参考了其他一些博客,在这里作了一个汇总,参考最多的是博客Transformer原理详解(图解完整版附代码) - 知乎、博客https://zhuanlan.zhihu.com/p/631398525和博客Transformer模型详解(图解最完整版) - 知乎,https://github.com/JayParks/transformer.也看了其他博客,在这里不详细的说了,作为笔记用。
1.transformer模型结构
模型的结构如下图所示:

Transformer 模型多用于自然语言处理、大模型、计算机视觉等。该模型核心特征是自注意力机制,这使得模型能够有效地处理序列数据中的长距离依赖问题。由于其出色的并行处理能力和优异的性能,Transformer 已成为当今自然语言处理领域的基石技术。
其实模型由上图中的Inputs一列的和outputs一列两大部分组成,可以用下图理解:

通过上图可以看出,transformer由encoder和decoder两大部分构成,encoder又由input Embedding (输入嵌入) 、Positional Encoding(位置编码) 、Multi-Head Attention 多头注意力机制和 Feed Forward 前馈网络四部分构成,decoder由Masked Multi-Head Attention 具有掩码的多头注意力机制 、 Multi-Head Attention 多头注意力机制 、 Feed Forward 前馈网络 和分类器四部分构成。下图由另一篇博客在内容的角度对encoder和decoder两个框架的解释,大家可以参考理解。

下面从各模块分别介绍各自的功能和原理。
2.encoder模块
在第一部分已经说过,encoder模块由 input Embedding (输入嵌入) 、Positional Encoding(位置编码) 、Multi-Head Attention 多头注意力机制和 Feed Forward 前馈网络四部分构成,下面我们分开介绍各部分。
2.1 input Embedding (输入嵌入)
Transformer 中单词的输入表示 x由单词 Embedding 和位置 Embedding (Positional Encoding)相加得到。

输入层的代码如下:
# 数据构建
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友 P', 'S I have a good friend .', 'I have a good friend . E'],
['我 有 零 个 女 朋 友 P', 'S I have zero girl friend .', 'I have zero girl friend . E'],
['我 有 一 个 男 朋 友 P', 'S I have a boy friend .', 'I have a boy friend . E']
]
# 测试集(希望transformer能达到的效果)
# 输入:"我 有 一 个 女 朋 友"
# 输出:"i have a girlfriend"
# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,
'个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9, '男': 10}
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7, 'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (源句子的长度)enc_input max sequence length
tgt_len = 7 # dec_input(=dec_output) max sequence length
# Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
# FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
n_heads = 8 # number of heads in Multi-Head Attention(有几套头)
# ==============================================================================================
# 数据构建
def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
# [[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
# [[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
# [[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
print(enc_inputs)
print(dec_inputs)
print(dec_outputs)
class MyDataSet(Data.Dataset):
"""自定义DataLoader"""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
运行代码效果如下:

下面我们继续看embeding的原理及代码,这里看了Pytorch常用的函数(二)pytorch中nn.Embedding原理及使用-CSDN博客的博客,该博客讲的很详细,在这里不多做解释。
首先代码如下:
import torch
import torch.nn as nn
import torch.optim as optim
# 定义一个简单的文本分类模型
class SimpleClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim):
super(SimpleClassifier, self).__init__()
self.embeddings = nn.Embedding(vocab_size, embedding_dim) # 嵌入层
# self.linear = nn.Linear(embedding_dim, 2) # 线性层,假设有两个输出类别
def forward(self, inputs):
embeds = self.embeddings(inputs) # 将输入的整数索引转换为嵌入向量
return embeds
if __name__ == "__main__":
# 假定的简化文本数据集和对应的标签
sentences = ["I love machine learning", "PyTorch is great for deep learning"]
labels = torch.tensor([0, 1], dtype=torch.long) # 0 和 1 代表两个不同的类别 : tensor([0, 1])
# 分词并构建词汇表:每个唯一单词映射到一个唯一的整数
# {'pytorch': 0, 'learning': 1, 'i': 2, 'is': 3, 'love': 4, 'machine': 5, 'for': 6, 'deep': 7, 'great': 8}
word_to_ix = {word: i for i, word in enumerate(set(" ".join(sentences).lower().split()))}
print("1.=====> ",word_to_ix)
# 准备训练数据:将文本转换为整数索引的形式
# [[2, 4, 5, 1], [0, 3, 8, 6, 7, 1]]:表示sentences内两个句子的每个单词在word_to_ix里的index位置
data = [[word_to_ix[word] for word in sentence.lower().split()] for sentence in sentences]
print("2.=====> ", data)
# tensor([3, 4]):3 = (2+4+5+1) / 4 ; 4 = (0+3+8+6+7+1) / 6
data = torch.tensor([sum(x) // len(x) for x in data], dtype=torch.long) # 简化处理:将句子表示为平均索引
print("3.=====> ",data)
# 初始化模型、损失函数和优化器
model = SimpleClassifier(len(word_to_ix), 5) # vocab_size为词汇表大小,embedding_dim为嵌入向量的维度
print()
# 训练模型
for epoch in range(1): # 迭代100次
for instance, label in zip(data, labels):
model.zero_grad() # 清除梯度
print("4.-----> ",instance, instance.view(1, -1))
log_probs = model(instance.view(1, -1)) # 前向传播
print()
print("5.-----> ",log_probs)
运行结果如下:

Embedding的原理如下:

代码展示如下:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
if __name__ == "__main__":
embedding = nn.Embedding(10, 3)#(v,h)===》 (10, 3)
input = torch.LongTensor([
[1, 2, 4, 5],
[4, 3, 2, 9]
]) # 表示这批有2个句子,每个句子由4个单词构成(b,s)
# 1.embedding的实现过程
input_onehot = F.one_hot(input, num_classes=10)
# print(input.shape)
print("5.======> ", input_onehot.shape) # (b,s,v)===》 (2,4,10)
print(input_onehot)
print("6.======> ", embedding.weight.shape) # (v,h)===》 (10, 3)
print(embedding.weight)
output = torch.matmul(input_onehot.type(torch.float32), embedding.weight)
print("7.======> ", output.shape) # (b,s,v) @ (v,h) ===》 (b,s,h)===》 (2,4,3)
print(output)
# 2.embedding的实现
out = embedding(input)
print("8.======> ", output.shape) # (b,s,v) @ (v,h) ===》 (b,s,h)===》 (2,4,3)
运行结果如下:


2.2 Positional Encoding(位置编码)
由于Transformer模型本身不具备处理序列顺序的能力,但因为在文本信息的处理时,当前单词是跟前后单词有关联系的,因此需要在输入嵌入层( input Embedding)后加入位置编码(position encoding),以提供位置信息,使用位置 Embedding 保存单词在序列中的相对或绝对位置。方便后面的模型训练能够获取文本的位置信息,从而更精准地来进行模型训练。
位置 Embedding 用 PE表示,PE 的维度与单词 Embedding 是一样的。PE 可以通过训练得到,也可以使用某种公式计算得到。在 Transformer 中采用了后者,计算公式如下:

其中,pos 表示单词在句子中的位置,d 表示 PE的维度 (与词 Embedding 一样),2i 表示偶数的维度,2i+1 表示奇数维度 (即 2i≤d, 2i+1≤d)。使用这种公式计算 PE 有以下的好处:
- 使 PE 能够适应比训练集里面所有句子更长的句子,假设训练集里面最长的句子是有 20 个单词,突然来了一个长度为 21 的句子,则使用公式计算的方法可以计算出第 21 位的 Embedding。
- 可以让模型容易地计算出相对位置,对于固定长度的间距 k,PE(pos+k) 可以用 PE(pos) 计算得到。因为 Sin(A+B) = Sin(A)Cos(B) + Cos(A)Sin(B), Cos(A+B) = Cos(A)Cos(B) - Sin(A)Sin(B)。
将单词的词 Embedding 和位置 Embedding 相加,就可以得到单词的表示向量 x,x 就是 Transformer 的输入。
代码如下(该代码需要和上面的代码结合运行),其中PositionalEncoding(nn.Module)类就是我们的位置公式的函数。
:
# 数据构建
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友 P', 'S I have a good friend .', 'I have a good friend . E'],
['我 有 零 个 女 朋 友 P', 'S I have zero girl friend .', 'I have zero girl friend . E'],
['我 有 一 个 男 朋 友 P', 'S I have a boy friend .', 'I have a boy friend . E']
]
# 测试集(希望transformer能达到的效果)
# 输入:"我 有 一 个 女 朋 友"
# 输出:"i have a girlfriend"
# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,'个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9, '男': 10}
src_idx2word = {i: w for i, w in enumerate(src_vocab)} # {0: 'P', 1: '我', 2: '有', 3: '一', 4: '个', 5: '好', 6: '朋', 7: '友', 8: '零', 9: '女', 10: '男'}
src_vocab_size = len(src_vocab) # 11
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,'friend': 5, 'zero': 6, 'girl': 7, 'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)} # {0: 'P', 1: 'I', 2: 'have', 3: 'a', 4: 'good', 5: 'friend', 6: 'zero', 7: 'girl', 8: 'boy', 9: 'S', 10: 'E', 11: '.'}
tgt_vocab_size = len(tgt_vocab)
# src_len = 8 # (源句子的长度)enc_input max sequence length
# tgt_len = 7 # dec_input(=dec_output) max sequence length
#
# # Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
# # FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
# d_ff = 2048
# d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
# n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
# n_heads = 8 # number of heads in Multi-Head Attention(有几套头)
# ==============================================================================================
# 数据构建
def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
# [[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
# [[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
# [[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
# print(enc_inputs)
# print(dec_inputs)
# print(dec_outputs)
class MyDataSet(Data.Dataset):
"""自定义DataLoader"""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# ===============================================> transformer模型 <===========================================
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model) # [5000, 512]
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # [5000, 1]
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # [256]
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term) # [5000, 512]
pe = pe.unsqueeze(0).transpose(0, 1) # [5000, 1, 512]
self.register_buffer('pe', pe)
def forward(self, x): # x=====>[8, 2, 512]
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # token Embedding (11,512)
self.pos_emb = PositionalEncoding(d_model) # Transformer中位置编码时固定的,不需要学习 PositionalEncoding( (dropout): Dropout(p=0.1, inplace=False) )
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model] =====> [2, 8, 512]
# enc_outputs.transpose(0, 1)) =====> [src_len, batch_size, d_model] <=====>[8, 2, 512]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose( 0, 1) # [batch_size, src_len, d_model]
return enc_outputs
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
def forward(self, enc_inputs, dec_inputs):
enc_outputs= self.encoder(enc_inputs)
return enc_outputs
if __name__ == "__main__":
model = Transformer().to(device)
print(model)
# 这里的损失函数里面设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3,momentum=0.99) # 用adam的话效果不好
# ====================================================================================================
epochs = 2
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
enc_outputs = model(enc_inputs, dec_inputs)
运行结果如下:

2.3 Multi-Head Attention (多头注意力机制)
先看代码1:
# 数据构建
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友 P', 'S I have a good friend .', 'I have a good friend . E'],
['我 有 零 个 女 朋 友 P', 'S I have zero girl friend .', 'I have zero girl friend . E'],
['我 有 一 个 男 朋 友 P', 'S I have a boy friend .', 'I have a boy friend . E']
]
# 测试集(希望transformer能达到的效果)
# 输入:"我 有 一 个 女 朋 友"
# 输出:"i have a girlfriend"
# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,
'个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9, '男': 10}
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7, 'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (源句子的长度)enc_input max sequence length
tgt_len = 7 # dec_input(=dec_output) max sequence length
# Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
# FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
n_heads = 8 # number of heads in Multi-Head Attention(有几套头)
# ==============================================================================================
# 数据构建
def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
# [[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
# [[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
# [[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
# print(enc_inputs)
# print(dec_inputs)
# print(dec_outputs)
class MyDataSet(Data.Dataset):
"""自定义DataLoader"""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# ==========================================================================================
# 1
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(
0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
# 2
def get_attn_pad_mask(seq_q, seq_k):
# pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量
"""这里的q,k表示的是两个序列(跟注意力机制的q,k没有关系),例如encoder_inputs (x1,x2,..xm)和encoder_inputs (x1,x2..xm)
encoder和decoder都可能调用这个函数,所以seq_len视情况而定
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # 这个seq_q只是用来expand维度的
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# 例如:seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] 构成一个立方体(batch_size个这样的矩阵)
return pad_attn_mask.expand(batch_size, len_q, len_k)
#3
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
说明:在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / \
np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
# mask矩阵填充scores(用-1e9填充scores中与attn_mask中值为1位置相对应的元素)
# Fills elements of self tensor with value where mask is True.
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # 对最后一个维度(v)做softmax
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
# context: [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V)
# context:[[z1,z2,...],[...]]向量, attn注意力稀疏矩阵(用于可视化的)
return context, attn
# 4
class MultiHeadAttention(nn.Module):
"""这个Attention类可以实现:
Encoder的Self-Attention
Decoder的Masked Self-Attention
Encoder-Decoder的Attention
输入:seq_len x d_model
输出:seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,
bias=False) # q,k必须维度相同,不然无法做点积
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# 下面的多头的参数矩阵是放在一起做线性变换的,然后再拆成多个头,这是工程实现的技巧
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# 线性变换 拆成多头
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K和V的长度一定相同,维度可以不同
K = self.W_K(input_K).view(batch_size, -1,
n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,
n_heads, d_v).transpose(1, 2)
# 因为是多头,所以mask矩阵要扩充成4维的
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask) # 3
# 下面将不同头的输出向量拼接在一起
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(
batch_size, -1, n_heads * d_v)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn
# 5
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 4
# self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_mask):
"""E
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask矩阵(pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# 第一个enc_inputs * W_Q = Q
# 第二个enc_inputs * W_K = K
# 第三个enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,
enc_self_attn_mask) # enc_inputs to same Q,K,V(未线性变换前)
# enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # token Embedding
self.pos_emb = PositionalEncoding(d_model) # Transformer中位置编码时固定的,不需要学习
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)]) # 5
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose( 0, 1) # [batch_size, src_len, d_model]
# Encoder输入序列的pad mask矩阵
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len] 2.
enc_self_attns = [] # 在计算中不需要用到,它主要用来保存你接下来返回的attention的值(这个主要是为了你画热力图等,用来看各个词之间的关系
for layer in self.layers: # for循环访问nn.ModuleList对象
# 上一个block的输出enc_outputs作为当前block的输入
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) # 传入的enc_outputs其实是input,传入mask矩阵是因为你要做self attention
enc_self_attns.append(enc_self_attn) # 这个只是为了可视化
return enc_outputs, enc_self_attns
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
def forward(self, enc_inputs, dec_inputs):
enc_outputs= self.encoder(enc_inputs)
return enc_outputs
if __name__ == "__main__":
model = Transformer().to(device)
print(model)
# 这里的损失函数里面设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3,momentum=0.99) # 用adam的话效果不好
# ====================================================================================================
epochs = 1
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
enc_outputs = model(enc_inputs, dec_inputs)
运行结果如下:

看代码2(该代码来自博客深入解读多头自注意力机制:原理与实践-腾讯云开发者社区-腾讯云):
import torch
import torch.nn.functional as F
from torch import nn
class MultiHeadSelfAttention(nn.Module):
def __init__(self, embed_size, num_heads):
super(MultiHeadSelfAttention, self).__init__()
assert embed_size % num_heads == 0
self.num_heads = num_heads
self.head_dim = embed_size // num_heads
self.query = nn.Linear(embed_size, embed_size)
self.key = nn.Linear(embed_size, embed_size)
self.value = nn.Linear(embed_size, embed_size)
self.fc_out = nn.Linear(embed_size, embed_size)
def forward(self, x):
N, seq_length, embed_size = x.shape
Q = self.query(x)
K = self.key(x)
V = self.value(x)
Q = Q.view(N, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(N, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(N, seq_length, self.num_heads, self.head_dim).transpose(1, 2)
attention = F.softmax((Q @ K.transpose(-2, -1)) / (self.head_dim ** 0.5), dim=-1)
out = attention @ V
out = out.transpose(1, 2).contiguous().view(N, seq_length, embed_size)
return self.fc_out(out)
# 测试代码
embed_size = 128
num_heads = 8
seq_length = 10
x = torch.rand((32, seq_length, embed_size))
attention = MultiHeadSelfAttention(embed_size, num_heads)
output = attention(x)
print(output.shape) # 应输出 (32, seq_length, embed_size)
运行结果如下:

下面介绍原理:
该原理来自博客来自博客【自然语言处理】Self-Attention机制_self-attention mechanism-CSDN博客。
2.3.1自注意力机制
Self-Attention 通过计算 Query、Key、Value,让序列中的每个元素与序列的其他元素进行比较,计算它们的相似性。
然后,Self-Attention 根据这个相似性来更新每个元素的表示,使得每个元素的表示不再只是它自己的信息,而是结合了与序列中其他元素的关联信息。
假设输入序列为 {x1,x2,…,xn},每个元素 xi 是一个向量。
具体步骤如下:
(1)计算 Query、Key 和 Value
- 对每个输入 xi,我们生成三个向量:Query (Q),Key (K),和 Value (V)。
- Query (Q) 表示当前元素“想要询问什么”,Key (K) 表示当前元素“可以被询问什么”,Value (V) 承载实际的输入信息。
- 这些向量由输入向量通过三组不同的可学习权重矩阵线性变换得到:

其中,Wq、Wk、Wv 是可学习的权重矩阵。
(2)计算注意力得分
对第 i 个元素(第 i 个 Query 向量 Qi)来说,要与所有的 Key 向量进行相似性计算,即要计算 Qi与所有 Kj(j=1,2,…,n)之间的相似性得分。通过点积来实现:

这里,dk 是 Key 向量的维度。缩放因子
是为了稳定训练过程,防止值过大。
这些得分反映了输入序列中不同元素之间的相关性。
(3)Softmax 归一化
- 计算完所有的相似度后,通过 Softmax 函数对这些得分进行归一化,得到权重分布:

这一步可以确保所有权重的和为 1。
(4)加权求和得到输出
最后,使用这些归一化后的权重 αij 对 Value 向量 Vj 进行加权求和,得到更新后的表示:

这样,每个输入 xi 的输出不仅仅依赖于它自身,还依赖于与其他所有输入的关系权重。这使得模型能够更好地捕捉输入序列中远程词语之间的关系。
2.3.2 多头注意力机制
为了让模型在不同的子空间上捕捉不同的注意力模式,Multi-Head Attention(多头注意力)机制进一步扩展了 Self-Attention。具体来说,多头注意力会将输入的 Query、Key 和 Value 向量分成多个头(head),在每个头上分别执行注意力操作,最后将各头的输出进行拼接并投影回原空间。这种方法使模型可以从多个角度理解输入序列中的不同部分,从而增强模型的表达能力。
2.4. Feed Forward (前馈网络)
1.原理
该内容看了博客Transformer原理详解(图解完整版附代码) - 知乎和探秘Transformer系列之(13)--- Feed-Forward Networks - 知乎,在Transformer原文中 Feed Forward 的全称是 Position-wise Feed-Forward Networks (点对点前馈神经网络,简称FFN),如下图所示:

FFN层是一个两层的全连接层,第一层的激活函数为 Relu,第二层不使用激活函数,在两个线性变换之间除了 ReLu 还使用了一个 Dropout,公式如下:

- 第一个线性层。其输入X是多头注意力的输出,第一个线性层通常会扩展输入的维度。例如,如果输入维度是 512,输出维度可能是 2048。这样做是为了使模型能够学习更复杂的函数,也是为了更好的融合前面多头注意力机制的输出内容。
- ReLU 激活: 这是一个非线性激活函数。此函数相对简单,如果输入是负数,则返回 0;如果输入是正数,则返回输入本身。ReLU激活使得模型能够学习非线性化能力,也可以理解为引入非线性对向量进行筛选。其数学表达为max(0,xW1+b1)。
- 第二个线性层。这是第一个线性层的逆操作,将维度降低回原始维度。FFN最终得到的输出矩阵维度与输入X的维度一致。
2.代码
代码如下:
# Pytorch中的Linear只会对最后一维操作,所以正好是我们希望的每个位置用同一个全连接网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
# [batch_size, seq_len, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual)
结合前面的代码如下:
# 数据构建
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友 P', 'S I have a good friend .', 'I have a good friend . E'],
['我 有 零 个 女 朋 友 P', 'S I have zero girl friend .', 'I have zero girl friend . E'],
['我 有 一 个 男 朋 友 P', 'S I have a boy friend .', 'I have a boy friend . E']
]
# 测试集(希望transformer能达到的效果)
# 输入:"我 有 一 个 女 朋 友"
# 输出:"i have a girlfriend"
# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,
'个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9, '男': 10}
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7, 'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (源句子的长度)enc_input max sequence length
tgt_len = 7 # dec_input(=dec_output) max sequence length
# Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
# FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
n_heads = 8 # number of heads in Multi-Head Attention(有几套头)
# ==============================================================================================
# 数据构建
def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
# [[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
# [[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
# [[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
class MyDataSet(Data.Dataset):
"""自定义DataLoader"""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# ====================================================================================================
# Transformer模型
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
def get_attn_pad_mask(seq_q, seq_k):
# pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量
"""这里的q,k表示的是两个序列(跟注意力机制的q,k没有关系),例如encoder_inputs (x1,x2,..xm)和encoder_inputs (x1,x2..xm)
encoder和decoder都可能调用这个函数,所以seq_len视情况而定
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # 这个seq_q只是用来expand维度的
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# 例如:seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] 构成一个立方体(batch_size个这样的矩阵)
return pad_attn_mask.expand(batch_size, len_q, len_k)
# ==========================================================================================
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
说明:在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
# mask矩阵填充scores(用-1e9填充scores中与attn_mask中值为1位置相对应的元素)
# Fills elements of self tensor with value where mask is True.
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # 对最后一个维度(v)做softmax
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
# context: [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V)
# context:[[z1,z2,...],[...]]向量, attn注意力稀疏矩阵(用于可视化的)
return context, attn
class MultiHeadAttention(nn.Module):
"""这个Attention类可以实现:
Encoder的Self-Attention
Decoder的Masked Self-Attention
Encoder-Decoder的Attention
输入:seq_len x d_model
输出:seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,
bias=False) # q,k必须维度相同,不然无法做点积
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# 下面的多头的参数矩阵是放在一起做线性变换的,然后再拆成多个头,这是工程实现的技巧
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# 线性变换 拆成多头
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K和V的长度一定相同,维度可以不同
K = self.W_K(input_K).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,n_heads, d_v).transpose(1, 2)
# 因为是多头,所以mask矩阵要扩充成4维的
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# 下面将不同头的输出向量拼接在一起
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn
# Pytorch中的Linear只会对最后一维操作,所以正好是我们希望的每个位置用同一个全连接网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
# [batch_size, seq_len, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_mask):
"""E
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask矩阵(pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# 第一个enc_inputs * W_Q = Q
# 第二个enc_inputs * W_K = K
# 第三个enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,enc_self_attn_mask) # enc_inputs to same Q,K,V(未线性变换前)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # token Embedding
self.pos_emb = PositionalEncoding(d_model) # Transformer中位置编码时固定的,不需要学习
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1) # [batch_size, src_len, d_model]
# Encoder输入序列的pad mask矩阵
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len]
enc_self_attns = [] # 在计算中不需要用到,它主要用来保存你接下来返回的attention的值(这个主要是为了你画热力图等,用来看各个词之间的关系
for layer in self.layers: # for循环访问nn.ModuleList对象
# 上一个block的输出enc_outputs作为当前block的输入
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs,enc_self_attn_mask) # 传入的enc_outputs其实是input,传入mask矩阵是因为你要做self attention
enc_self_attns.append(enc_self_attn) # 这个只是为了可视化
return enc_outputs, enc_self_attns
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
def forward(self, enc_inputs, dec_inputs):
"""Transformers的输入:两个序列
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
"""
# enc_outputs: [batch_size, src_len, d_model], enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
# 经过Encoder网络后,得到的输出还是[batch_size, src_len, d_model]
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
return enc_outputs, enc_self_attns
if __name__ == "__main__":
model = Transformer().to(device)
print(model)
# 这里的损失函数里面设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99) # 用adam的话效果不好
# ====================================================================================================
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
enc_outputs, enc_self_attns = model(enc_inputs, dec_inputs)
运行结果如下:

阴影部分就是前馈网络层。
3.decoder模块
1.代码
decoder代码如下:
def get_attn_pad_mask(seq_q, seq_k):
# pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量
"""这里的q,k表示的是两个序列(跟注意力机制的q,k没有关系),例如encoder_inputs (x1,x2,..xm)和encoder_inputs (x1,x2..xm)
encoder和decoder都可能调用这个函数,所以seq_len视情况而定
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # 这个seq_q只是用来expand维度的
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# 例如:seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] 构成一个立方体(batch_size个这样的矩阵)
return pad_attn_mask.expand(batch_size, len_q, len_k)
def get_attn_subsequence_mask(seq):
"""建议打印出来看看是什么的输出(一目了然)
seq: [batch_size, tgt_len]
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# attn_shape: [batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 生成一个上三角矩阵
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
return subsequence_mask # [batch_size, tgt_len, tgt_len]
# ==========================================================================================
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
说明:在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
# mask矩阵填充scores(用-1e9填充scores中与attn_mask中值为1位置相对应的元素)
# Fills elements of self tensor with value where mask is True.
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # 对最后一个维度(v)做softmax
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
# context: [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V)
# context:[[z1,z2,...],[...]]向量, attn注意力稀疏矩阵(用于可视化的)
return context, attn
class MultiHeadAttention(nn.Module):
"""这个Attention类可以实现:
Encoder的Self-Attention
Decoder的Masked Self-Attention
Encoder-Decoder的Attention
输入:seq_len x d_model
输出:seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,bias=False) # q,k必须维度相同,不然无法做点积
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# 下面的多头的参数矩阵是放在一起做线性变换的,然后再拆成多个头,这是工程实现的技巧
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# 线性变换 拆成多头
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K和V的长度一定相同,维度可以不同
K = self.W_K(input_K).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,n_heads, d_v).transpose(1, 2)
# 因为是多头,所以mask矩阵要扩充成4维的
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# 下面将不同头的输出向量拼接在一起
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn
# Pytorch中的Linear只会对最后一维操作,所以正好是我们希望的每个位置用同一个全连接网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
# [batch_size, seq_len, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual)
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
dec_inputs: [batch_size, tgt_len, d_model]
enc_outputs: [batch_size, src_len, d_model]
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask: [batch_size, tgt_len, src_len]
"""
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,dec_self_attn_mask) # 这里的Q,K,V全是Decoder自己的输入
# dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,dec_enc_attn_mask) # Attention层的Q(来自decoder) 和 K,V(来自encoder)
# [batch_size, tgt_len, d_model]
dec_outputs = self.pos_ffn(dec_outputs)
# dec_self_attn, dec_enc_attn这两个是为了可视化的
return dec_outputs, dec_self_attn, dec_enc_attn
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model) # Decoder输入的embed词表
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer()
for _ in range(n_layers)]) # Decoder的blocks
def forward(self, dec_inputs, enc_inputs, enc_outputs):
"""
dec_inputs: [batch_size, tgt_len]
enc_inputs: [batch_size, src_len]
enc_outputs: [batch_size, src_len, d_model] # 用在Encoder-Decoder Attention层
"""
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).to(device) # [batch_size, tgt_len, d_model]
# Decoder输入序列的pad mask矩阵(这个例子中decoder是没有加pad的,实际应用中都是有pad填充的)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(device) # [batch_size, tgt_len, tgt_len]
# Masked Self_Attention:当前时刻是看不到未来的信息的
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(device) # [batch_size, tgt_len, tgt_len]
# Decoder中把两种mask矩阵相加(既屏蔽了pad的信息,也屏蔽了未来时刻的信息)
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),0).to(device) # [batch_size, tgt_len, tgt_len]; torch.gt比较两个矩阵的元素,大于则返回1,否则返回0
# 这个mask主要用于encoder-decoder attention层
# get_attn_pad_mask主要是enc_inputs的pad mask矩阵(因为enc是处理K,V的,求Attention时是用v1,v2,..vm去加权的,要把pad对应的v_i的相关系数设为0,这样注意力就不会关注pad向量)
# dec_inputs只是提供expand的size的
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
# Decoder的Block是上一个Block的输出dec_outputs(变化)和Encoder网络的输出enc_outputs(固定)
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
# dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attns, dec_enc_attns
其实不难发现,decoder用到的注意力模块和encoder相同,所以在这里不多做赘述。
decoder的模型网络结构如下:

2.原理
decoder的组成模块和encoder 的组成模块相似,不做赘述。
4.transformer代码及运行结果
代码如下:
# ======================================
# === Pytorch手写Transformer完整代码
# ======================================
"""
code by Tae Hwan Jung(Jeff Jung) @graykode, Derek Miller @dmmiller612, modify by shwei
Reference: https://github.com/jadore801120/attention-is-all-you-need-pytorch
https://github.com/JayParks/transformer
"""
# ====================================================================================================
# 数据构建
import math
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data
# device = 'cpu'
device = 'cuda'
# transformer epochs
epochs = 100
# 这里我没有用什么大型的数据集,而是手动输入了两对中文→英语的句子
# 还有每个字的索引也是我手动硬编码上去的,主要是为了降低代码阅读难度
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
# 训练集
sentences = [
# 中文和英语的单词个数不要求相同
# enc_input dec_input dec_output
['我 有 一 个 好 朋 友 P', 'S I have a good friend .', 'I have a good friend . E'],
['我 有 零 个 女 朋 友 P', 'S I have zero girl friend .', 'I have zero girl friend . E'],
['我 有 一 个 男 朋 友 P', 'S I have a boy friend .', 'I have a boy friend . E']
]
# 测试集(希望transformer能达到的效果)
# 输入:"我 有 一 个 女 朋 友"
# 输出:"i have a girlfriend"
# 中文和英语的单词要分开建立词库
# Padding Should be Zero
src_vocab = {'P': 0, '我': 1, '有': 2, '一': 3,
'个': 4, '好': 5, '朋': 6, '友': 7, '零': 8, '女': 9, '男': 10}
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
src_vocab_size = len(src_vocab)
tgt_vocab = {'P': 0, 'I': 1, 'have': 2, 'a': 3, 'good': 4,
'friend': 5, 'zero': 6, 'girl': 7, 'boy': 8, 'S': 9, 'E': 10, '.': 11}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 8 # (源句子的长度)enc_input max sequence length
tgt_len = 7 # dec_input(=dec_output) max sequence length
# Transformer Parameters
d_model = 512 # Embedding Size(token embedding和position编码的维度)
# FeedForward dimension (两次线性层中的隐藏层 512->2048->512,线性层是用来做特征提取的),当然最后会再接一个projection层
d_ff = 2048
d_k = d_v = 64 # dimension of K(=Q), V(Q和K的维度需要相同,这里为了方便让K=V)
n_layers = 6 # number of Encoder of Decoder Layer(Block的个数)
n_heads = 8 # number of heads in Multi-Head Attention(有几套头)
# ==============================================================================================
# 数据构建
def make_data(sentences):
"""把单词序列转换为数字序列"""
enc_inputs, dec_inputs, dec_outputs = [], [], []
for i in range(len(sentences)):
enc_input = [[src_vocab[n] for n in sentences[i][0].split()]]
dec_input = [[tgt_vocab[n] for n in sentences[i][1].split()]]
dec_output = [[tgt_vocab[n] for n in sentences[i][2].split()]]
# [[1, 2, 3, 4, 5, 6, 7, 0], [1, 2, 8, 4, 9, 6, 7, 0], [1, 2, 3, 4, 10, 6, 7, 0]]
enc_inputs.extend(enc_input)
# [[9, 1, 2, 3, 4, 5, 11], [9, 1, 2, 6, 7, 5, 11], [9, 1, 2, 3, 8, 5, 11]]
dec_inputs.extend(dec_input)
# [[1, 2, 3, 4, 5, 11, 10], [1, 2, 6, 7, 5, 11, 10], [1, 2, 3, 8, 5, 11, 10]]
dec_outputs.extend(dec_output)
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
class MyDataSet(Data.Dataset):
"""自定义DataLoader"""
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__()
self.enc_inputs = enc_inputs
self.dec_inputs = dec_inputs
self.dec_outputs = dec_outputs
def __len__(self):
return self.enc_inputs.shape[0]
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx]
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
# ====================================================================================================
# Transformer模型
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(
0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
"""
x: [seq_len, batch_size, d_model]
"""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
def get_attn_pad_mask(seq_q, seq_k):
# pad mask的作用:在对value向量加权平均的时候,可以让pad对应的alpha_ij=0,这样注意力就不会考虑到pad向量
"""这里的q,k表示的是两个序列(跟注意力机制的q,k没有关系),例如encoder_inputs (x1,x2,..xm)和encoder_inputs (x1,x2..xm)
encoder和decoder都可能调用这个函数,所以seq_len视情况而定
seq_q: [batch_size, seq_len]
seq_k: [batch_size, seq_len]
seq_len could be src_len or it could be tgt_len
seq_len in seq_q and seq_len in seq_k maybe not equal
"""
batch_size, len_q = seq_q.size() # 这个seq_q只是用来expand维度的
batch_size, len_k = seq_k.size()
# eq(zero) is PAD token
# 例如:seq_k = [[1,2,3,4,0], [1,2,3,5,0]]
# [batch_size, 1, len_k], True is masked
pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)
# [batch_size, len_q, len_k] 构成一个立方体(batch_size个这样的矩阵)
return pad_attn_mask.expand(batch_size, len_q, len_k)
def get_attn_subsequence_mask(seq):
"""建议打印出来看看是什么的输出(一目了然)
seq: [batch_size, tgt_len]
"""
attn_shape = [seq.size(0), seq.size(1), seq.size(1)]
# attn_shape: [batch_size, tgt_len, tgt_len]
subsequence_mask = np.triu(np.ones(attn_shape), k=1) # 生成一个上三角矩阵
subsequence_mask = torch.from_numpy(subsequence_mask).byte()
return subsequence_mask # [batch_size, tgt_len, tgt_len]
# ==========================================================================================
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask):
"""
Q: [batch_size, n_heads, len_q, d_k]
K: [batch_size, n_heads, len_k, d_k]
V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask: [batch_size, n_heads, seq_len, seq_len]
说明:在encoder-decoder的Attention层中len_q(q1,..qt)和len_k(k1,...km)可能不同
"""
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
# mask矩阵填充scores(用-1e9填充scores中与attn_mask中值为1位置相对应的元素)
# Fills elements of self tensor with value where mask is True.
scores.masked_fill_(attn_mask, -1e9)
attn = nn.Softmax(dim=-1)(scores) # 对最后一个维度(v)做softmax
# scores : [batch_size, n_heads, len_q, len_k] * V: [batch_size, n_heads, len_v(=len_k), d_v]
# context: [batch_size, n_heads, len_q, d_v]
context = torch.matmul(attn, V)
# context:[[z1,z2,...],[...]]向量, attn注意力稀疏矩阵(用于可视化的)
return context, attn
class MultiHeadAttention(nn.Module):
"""这个Attention类可以实现:
Encoder的Self-Attention
Decoder的Masked Self-Attention
Encoder-Decoder的Attention
输入:seq_len x d_model
输出:seq_len x d_model
"""
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads,bias=False) # q,k必须维度相同,不然无法做点积
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask):
"""
input_Q: [batch_size, len_q, d_model]
input_K: [batch_size, len_k, d_model]
input_V: [batch_size, len_v(=len_k), d_model]
attn_mask: [batch_size, seq_len, seq_len]
"""
residual, batch_size = input_Q, input_Q.size(0)
# 下面的多头的参数矩阵是放在一起做线性变换的,然后再拆成多个头,这是工程实现的技巧
# B: batch_size, S:seq_len, D: dim
# (B, S, D) -proj-> (B, S, D_new) -split-> (B, S, Head, W) -trans-> (B, Head, S, W)
# 线性变换 拆成多头
# Q: [batch_size, n_heads, len_q, d_k]
Q = self.W_Q(input_Q).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# K: [batch_size, n_heads, len_k, d_k] # K和V的长度一定相同,维度可以不同
K = self.W_K(input_K).view(batch_size, -1,n_heads, d_k).transpose(1, 2)
# V: [batch_size, n_heads, len_v(=len_k), d_v]
V = self.W_V(input_V).view(batch_size, -1,n_heads, d_v).transpose(1, 2)
# 因为是多头,所以mask矩阵要扩充成4维的
# attn_mask: [batch_size, seq_len, seq_len] -> [batch_size, n_heads, seq_len, seq_len]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)
# context: [batch_size, n_heads, len_q, d_v], attn: [batch_size, n_heads, len_q, len_k]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask)
# 下面将不同头的输出向量拼接在一起
# context: [batch_size, n_heads, len_q, d_v] -> [batch_size, len_q, n_heads * d_v]
context = context.transpose(1, 2).reshape(batch_size, -1, n_heads * d_v)
# 这个全连接层可以保证多头attention的输出仍然是seq_len x d_model
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual), attn
# Pytorch中的Linear只会对最后一维操作,所以正好是我们希望的每个位置用同一个全连接网络
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False)
)
def forward(self, inputs):
"""
inputs: [batch_size, seq_len, d_model]
"""
residual = inputs
output = self.fc(inputs)
# [batch_size, seq_len, d_model]
return nn.LayerNorm(d_model).to(device)(output + residual)
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, enc_inputs, enc_self_attn_mask):
"""E
enc_inputs: [batch_size, src_len, d_model]
enc_self_attn_mask: [batch_size, src_len, src_len] mask矩阵(pad mask or sequence mask)
"""
# enc_outputs: [batch_size, src_len, d_model], attn: [batch_size, n_heads, src_len, src_len]
# 第一个enc_inputs * W_Q = Q
# 第二个enc_inputs * W_K = K
# 第三个enc_inputs * W_V = V
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs,enc_self_attn_mask) # enc_inputs to same Q,K,V(未线性变换前)
enc_outputs = self.pos_ffn(enc_outputs)
# enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
class DecoderLayer(nn.Module):
def __init__(self):
super(DecoderLayer, self).__init__()
self.dec_self_attn = MultiHeadAttention()
self.dec_enc_attn = MultiHeadAttention()
self.pos_ffn = PoswiseFeedForwardNet()
def forward(self, dec_inputs, enc_outputs, dec_self_attn_mask, dec_enc_attn_mask):
"""
dec_inputs: [batch_size, tgt_len, d_model]
enc_outputs: [batch_size, src_len, d_model]
dec_self_attn_mask: [batch_size, tgt_len, tgt_len]
dec_enc_attn_mask: [batch_size, tgt_len, src_len]
"""
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len]
dec_outputs, dec_self_attn = self.dec_self_attn(dec_inputs, dec_inputs, dec_inputs,dec_self_attn_mask) # 这里的Q,K,V全是Decoder自己的输入
# dec_outputs: [batch_size, tgt_len, d_model], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
dec_outputs, dec_enc_attn = self.dec_enc_attn(dec_outputs, enc_outputs, enc_outputs,dec_enc_attn_mask) # Attention层的Q(来自decoder) 和 K,V(来自encoder)
# [batch_size, tgt_len, d_model]
dec_outputs = self.pos_ffn(dec_outputs)
# dec_self_attn, dec_enc_attn这两个是为了可视化的
return dec_outputs, dec_self_attn, dec_enc_attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Embedding(src_vocab_size, d_model) # token Embedding
self.pos_emb = PositionalEncoding(d_model) # Transformer中位置编码时固定的,不需要学习
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs):
"""
enc_inputs: [batch_size, src_len]
"""
enc_outputs = self.src_emb(enc_inputs) # [batch_size, src_len, d_model]
enc_outputs = self.pos_emb(enc_outputs.transpose(0, 1)).transpose(0, 1) # [batch_size, src_len, d_model]
# Encoder输入序列的pad mask矩阵
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # [batch_size, src_len, src_len]
enc_self_attns = [] # 在计算中不需要用到,它主要用来保存你接下来返回的attention的值(这个主要是为了你画热力图等,用来看各个词之间的关系
for layer in self.layers: # for循环访问nn.ModuleList对象
# 上一个block的输出enc_outputs作为当前block的输入
# enc_outputs: [batch_size, src_len, d_model], enc_self_attn: [batch_size, n_heads, src_len, src_len]
enc_outputs, enc_self_attn = layer(enc_outputs,enc_self_attn_mask) # 传入的enc_outputs其实是input,传入mask矩阵是因为你要做self attention
enc_self_attns.append(enc_self_attn) # 这个只是为了可视化
return enc_outputs, enc_self_attns
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.tgt_emb = nn.Embedding(tgt_vocab_size, d_model) # Decoder输入的embed词表
self.pos_emb = PositionalEncoding(d_model)
self.layers = nn.ModuleList([DecoderLayer()
for _ in range(n_layers)]) # Decoder的blocks
def forward(self, dec_inputs, enc_inputs, enc_outputs):
"""
dec_inputs: [batch_size, tgt_len]
enc_inputs: [batch_size, src_len]
enc_outputs: [batch_size, src_len, d_model] # 用在Encoder-Decoder Attention层
"""
dec_outputs = self.tgt_emb(dec_inputs) # [batch_size, tgt_len, d_model]
dec_outputs = self.pos_emb(dec_outputs.transpose(0, 1)).transpose(0, 1).to(device) # [batch_size, tgt_len, d_model]
# Decoder输入序列的pad mask矩阵(这个例子中decoder是没有加pad的,实际应用中都是有pad填充的)
dec_self_attn_pad_mask = get_attn_pad_mask(dec_inputs, dec_inputs).to(device) # [batch_size, tgt_len, tgt_len]
# Masked Self_Attention:当前时刻是看不到未来的信息的
dec_self_attn_subsequence_mask = get_attn_subsequence_mask(dec_inputs).to(device) # [batch_size, tgt_len, tgt_len]
# Decoder中把两种mask矩阵相加(既屏蔽了pad的信息,也屏蔽了未来时刻的信息)
dec_self_attn_mask = torch.gt((dec_self_attn_pad_mask + dec_self_attn_subsequence_mask),0).to(device) # [batch_size, tgt_len, tgt_len]; torch.gt比较两个矩阵的元素,大于则返回1,否则返回0
# 这个mask主要用于encoder-decoder attention层
# get_attn_pad_mask主要是enc_inputs的pad mask矩阵(因为enc是处理K,V的,求Attention时是用v1,v2,..vm去加权的,要把pad对应的v_i的相关系数设为0,这样注意力就不会关注pad向量)
# dec_inputs只是提供expand的size的
dec_enc_attn_mask = get_attn_pad_mask(dec_inputs, enc_inputs) # [batc_size, tgt_len, src_len]
dec_self_attns, dec_enc_attns = [], []
for layer in self.layers:
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attn: [batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [batch_size, h_heads, tgt_len, src_len]
# Decoder的Block是上一个Block的输出dec_outputs(变化)和Encoder网络的输出enc_outputs(固定)
dec_outputs, dec_self_attn, dec_enc_attn = layer(dec_outputs, enc_outputs, dec_self_attn_mask,dec_enc_attn_mask)
dec_self_attns.append(dec_self_attn)
dec_enc_attns.append(dec_enc_attn)
# dec_outputs: [batch_size, tgt_len, d_model]
return dec_outputs, dec_self_attns, dec_enc_attns
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.encoder = Encoder().to(device)
self.decoder = Decoder().to(device)
self.projection = nn.Linear(d_model, tgt_vocab_size, bias=False).to(device)
def forward(self, enc_inputs, dec_inputs):
"""Transformers的输入:两个序列
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
"""
# tensor to store decoder outputs
# outputs = torch.zeros(batch_size, tgt_len, tgt_vocab_size).to(self.device)
# enc_outputs: [batch_size, src_len, d_model], enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
# 经过Encoder网络后,得到的输出还是[batch_size, src_len, d_model]
enc_outputs, enc_self_attns = self.encoder(enc_inputs)
# dec_outputs: [batch_size, tgt_len, d_model], dec_self_attns: [n_layers, batch_size, n_heads, tgt_len, tgt_len], dec_enc_attn: [n_layers, batch_size, tgt_len, src_len]
dec_outputs, dec_self_attns, dec_enc_attns = self.decoder(dec_inputs, enc_inputs, enc_outputs)
# dec_outputs: [batch_size, tgt_len, d_model] -> dec_logits: [batch_size, tgt_len, tgt_vocab_size]
dec_logits = self.projection(dec_outputs)
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns, dec_self_attns, dec_enc_attns
model = Transformer().to(device)
# 这里的损失函数里面设置了一个参数 ignore_index=0,因为 "pad" 这个单词的索引为 0,这样设置以后,就不会计算 "pad" 的损失(因为本来 "pad" 也没有意义,不需要计算)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.SGD(model.parameters(), lr=1e-3,momentum=0.99) # 用adam的话效果不好
# ====================================================================================================
for epoch in range(epochs):
for enc_inputs, dec_inputs, dec_outputs in loader:
"""
enc_inputs: [batch_size, src_len]
dec_inputs: [batch_size, tgt_len]
dec_outputs: [batch_size, tgt_len]
"""
enc_inputs, dec_inputs, dec_outputs = enc_inputs.to(device), dec_inputs.to(device), dec_outputs.to(device)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
outputs, enc_self_attns, dec_self_attns, dec_enc_attns = model(enc_inputs, dec_inputs)
# dec_outputs.view(-1):[batch_size * tgt_len * tgt_vocab_size]
loss = criterion(outputs, dec_outputs.view(-1))
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss))
optimizer.zero_grad()
loss.backward()
optimizer.step()
def greedy_decoder(model, enc_input, start_symbol):
"""贪心编码
For simplicity, a Greedy Decoder is Beam search when K=1. This is necessary for inference as we don't know the
target sequence input. Therefore we try to generate the target input word by word, then feed it into the transformer.
Starting Reference: http://nlp.seas.harvard.edu/2018/04/03/attention.html#greedy-decoding
:param model: Transformer Model
:param enc_input: The encoder input
:param start_symbol: The start symbol. In this example it is 'S' which corresponds to index 4
:return: The target input
"""
enc_outputs, enc_self_attns = model.encoder(enc_input)
# 初始化一个空的tensor: tensor([], size=(1, 0), dtype=torch.int64)
dec_input = torch.zeros(1, 0).type_as(enc_input.data)
terminal = False
next_symbol = start_symbol
while not terminal:
# 预测阶段:dec_input序列会一点点变长(每次添加一个新预测出来的单词)
dec_input = torch.cat([dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
-1)
dec_outputs, _, _ = model.decoder(dec_input, enc_input, enc_outputs)
projected = model.projection(dec_outputs)
prob = projected.squeeze(0).max(dim=-1, keepdim=False)[1]
# 增量更新(我们希望重复单词预测结果是一样的)
# 我们在预测是会选择性忽略重复的预测的词,只摘取最新预测的单词拼接到输入序列中
# 拿出当前预测的单词(数字)。我们用x'_t对应的输出z_t去预测下一个单词的概率,不用z_1,z_2..z_{t-1}
next_word = prob.data[-1]
next_symbol = next_word
if next_symbol == tgt_vocab["E"]:
terminal = True
# print(next_word)
# greedy_dec_predict = torch.cat(
# [dec_input.to(device), torch.tensor([[next_symbol]], dtype=enc_input.dtype).to(device)],
# -1)
greedy_dec_predict = dec_input[:, 1:]
return greedy_dec_predict
# ==========================================================================================
# 预测阶段
# 测试集
sentences = [
# enc_input dec_input dec_output
['我 有 零 个 女 朋 友 P', '', '']
]
enc_inputs, dec_inputs, dec_outputs = make_data(sentences)
test_loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), 2, True)
enc_inputs, _, _ = next(iter(test_loader))
print()
print("=" * 30)
print("利用训练好的Transformer模型将中文句子'我 有 零 个 女 朋 友' 翻译成英文句子: ")
for i in range(len(enc_inputs)):
greedy_dec_predict = greedy_decoder(model, enc_inputs[i].view(1, -1).to(device), start_symbol=tgt_vocab["S"])
print(enc_inputs[i], '->', greedy_dec_predict.squeeze())
print([src_idx2word[t.item()] for t in enc_inputs[i]], '->',[idx2word[n.item()] for n in greedy_dec_predict.squeeze()])
运行结果如下:

更多推荐
所有评论(0)