大模型 Transformer模型(上)
这篇主要讲一下Transformer 框架搭建流程,苯人总结了一套:Dataprocess 数据处理 --》Position位置编码 --》Mask掩码 --》MHA多头注意力机制 --》FFN前馈神经网络 --》Encoder编码器 --》Decoder解码器 --》Transformer 模型构建 --》模型训练 --》模型预测。
目录
这篇主要讲一下Transformer 框架搭建流程,苯人总结了一套:
Dataprocess 数据处理 --》Position位置编码 --》Mask掩码 --》MHA多头注意力机制 --》FFN前馈神经网络 --》Encoder编码器 --》Decoder解码器 --》Transformer 模型构建 --》模型训练 --》模型预测
一、Dataprocess 数据处理
数据处理的基本流程就是:拿到文本序列(即任务设定,比如德英翻译)--》定义特殊符号 --》构建词表 --》将文本编码为整数序列 --》自定义数据集以及数据加载器 --》代码调试
代码如下:
import torch
import torch.utils.data as Data
#定义特殊符号
# S: decoding input 的开始符
# E: decoding output 的结束符
# P: padding的占位符
# 法语译英文
#第一列是编码器输入的原始文本序列,第二列是解码器的输入(S开头的译文),第三列是解码器的输出(E结尾的译文)
sentences = [
# enc_input dec_input dec_output
['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],
['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']
]
# 构建词汇表
#原始输入文本序列的词汇表
src_vocab = {'P' : 0, 'ich' : 1, 'mochte' : 2, 'ein' : 3, 'bier' : 4, 'cola' : 5}
src_vocab_size = len(src_vocab)
src_idx2word = {i: w for i, w in enumerate(src_vocab)}
#译文的词汇表
tgt_vocab = {'P' : 0, 'i' : 1, 'want' : 2, 'a' : 3, 'beer' : 4, 'coke' : 5, 'S' : 6, 'E' : 7, '.' : 8}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)
src_len = 5 # enc_input最大序列长度(源序列长度)
tgt_len = 6 # dec_input(等于dec_output)最大序列长度(目标序列长度)
# Transformer 参数
d_model = 512 # 嵌入维度大小
d_ff = 2048 # 前馈网络的维度大小
d_k = d_v = 64 # K(等于Q)和V的维度大小
n_layers = 6 # 编码器和解码器层的数量
n_heads = 8 # 多头注意力机制中的头数
# 将传入编码器的文本转为数字序列
def make_data(sentences):
enc_inputs, dec_inputs, dec_outputs = [], [], []
# 二维列表所以套两个循环
for i in range(len(sentences)):
enc_input = [src_vocab[n] for n in sentences[i][0].split()]
dec_input = [tgt_vocab[n] for n in sentences[i][1].split()]
dec_output = [tgt_vocab[n] for n in sentences[i][2].split()]
'''
首先len(sentences)为2,range(len(sentences))就是(0,2),所以i取0和1,分别表示sentences的第一行和第二行
然后比如sentences[i][0],当i=0时就取的是sentences的第0行第0列,也就是'ich mochte ein bier P'这句,用split按空格分隔后再用 for n 来取到每个词
最后 src_vocab[n]拿到每个词对应的索引,加入到创建的空列表,比如此时 enc_input = [1, 2, 3, 4, 0]
'''
enc_inputs.append(enc_input)
dec_inputs.append(dec_input)
dec_outputs.append(dec_output)
#最后转为张量
return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)
enc_inputs, dec_inputs, dec_outputs = make_data(sentences) #保证了三种数据的形状一致
# print(enc_inputs)
# print(enc_inputs.shape)
# 定义自定义数据集类 MyDataSet
#这里注意我们的数据集结构是:一条样本 = enc_input + dec_input + dec_output 这三部分组成的一套输入输出组
class MyDataSet(Data.Dataset):
#1、定义好数据放哪
def __init__(self, enc_inputs, dec_inputs, dec_outputs):
super(MyDataSet, self).__init__() # 调用父类的初始化方法
#把三块输入保存下来,类似于装进一个大书包
self.enc_inputs = enc_inputs # 初始化编码器输入数据
self.dec_inputs = dec_inputs # 初始化解码器输入数据
self.dec_outputs = dec_outputs # 初始化解码器输出数据
# 2、告诉一共有多少条数据
def __len__(self):
return self.enc_inputs.shape[0] # 返回数据集样本数量,shape[0]表示有多少行,即多少条样本
#这里只返回 enc_inputs 的shape[0]是因为之前已经确定了三种数据的形状都是一致的,所以只用返回一组
# 3、告诉怎么拿到某一条数据
def __getitem__(self, idx):
return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx] # 获取指定索引处的样本数据
#比如 idx = 0,则第一条数据集中的样本就是:[1, 2, 3, 4, 0],[6, 1, 2, 3, 4, 8],[1, 2, 3, 4, 8, 7]
# 创建 DataLoader 对象 loader,用于批量加载数据
loader = Data.DataLoader(
MyDataSet(enc_inputs, dec_inputs, dec_outputs), # 自定义数据集对象作为数据源
batch_size=2, # 每个批次的样本数量
shuffle=True # 是否打乱数据集顺序,True 表示打乱
)
# 代码调试
if __name__ == '__main__':
for i, (src_seq, tgt_in_seq, tgt_out_seq) in enumerate(loader):
print(src_seq)
print(tgt_in_seq)
print(tgt_out_seq)
break
相关介绍都写在注释里了
运行结果:
二、Position 位置编码
位置编码的代码其实是根据公式来的:
下面是一段模版代码:
# 定义一个位置编码类
import math
import torch
from torch import nn
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
"""
:param d_model: 词向量的维度
:param dropout: 丢弃比例
:param max_len: 预定义一个最大序列长度
"""
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout) # 定义一个 Dropout 层,用于随机丢弃部分数据,防止过拟合
# 初始化位置编码矩阵 pe,用来保存每个位置的编码向量
pe = torch.zeros(max_len, d_model)
# # shape: (50, 1) * (256,) → (50, 256)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 生成位置的下标,shape: (max_len, 1),升维是为了后面好广播
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 计算分母项
pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置使用 sin 函数编码位置信息
pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置使用 cos 函数编码位置信息
pe = pe.unsqueeze(0) # 转置并增加一个维度,shape: ( 1, max_len,d_model)
self.register_buffer('pe', pe) # 将位置编码矩阵 pe 注册为模型的缓冲区,不算模型的参数,但希望它随着模型保存、加载
def forward(self, x):
'''
x: [batch_size, seq_len, d_model]
'''
x = x + self.pe[:,:x.size(1), :] # 将输入张量 x 与位置编码矩阵 pe 相加,根据输入序列长度截取对应位置编码
return self.dropout(x) # 对相加后的张量进行 Dropout 操作并返回
# 测试代码
if __name__ == '__main__':
pe = PositionalEncoding(d_model=512)
embed = torch.randn(5, 10, 512)
out = pe(embed)
print(out)
print(out.shape) #torch.Size([5, 10, 512])
这里我就直接用老师发的模版代码了 (๑•̀ㅂ•́)و✧
三、Mask 掩码
掩码的两个作用:一个是为了掩盖之前为了固定长度而填充的0(填充掩码),一个是为了掩盖未来的信息(未来掩码),这里详细解释一下未来掩码的原理:
在利用公式计算出注意力分数后(经过softmax激活函数之前)模型会应用一个上三角矩阵,这个上三角矩阵的值通常为负无穷,与原注意力分数相加再经过激活函数后会变为0,下一步在与 V 相乘后得到的实际值仍为0,即不会保留这部分的信息,过程大概如下:
这里解释一下为什么是上三角矩阵呢,是因为这里把“未来”定义成“当前位置右边的 token”,所以就遮住的是上三角部分。
代码如下:
"""
构建所需要的掩码
每个attention都需要mask,只是不同部分所需要的Mask不同
"""
import numpy as np
import torch
def att_pad_mask(seq_k, seq_q):
"""
填充掩码
特别情况:交叉注意力机制
:param seq_k: [b,n1] tensor([[1, 2, 3, 4, 0],[1, 2, 3, 5, 0]])
:param seq_q: [b,n2]
:return:数据格式【b,n2,n1]
"""
batch_size, len_q = seq_q.size()
# 跟sk里面每个数据比较 为0返回True
mask = seq_k.eq(0).unsqueeze(1) # 形状由 [b,n1] -- > [b,1,n1]
mask = mask.repeat(1, len_q, 1) # 【b,n2,n1]
return mask #形状 [batch_size, len_q, len_k] 的布尔矩阵
def att_sub_mask(seq):
"""
未来掩码
:param seq: [b,n]
:return: [b,n,n]
"""
att_shape = [seq.size(0), seq.size(1), seq.size(1)] #构造形状为[batch_size, n, n] 的 mask
sub_mask = np.triu(np.ones(att_shape), k=1) #构建一个上三角矩阵
sub_mask = torch.from_numpy(sub_mask).byte() #从 NumPy 转换为 PyTorch
return sub_mask #输出形状为[batch_size, n, n],未来位置是 1(True),当前和过去是 0(False)
# 测试数据
if __name__ == '__main__':
import torch
seq_q = torch.tensor([[1, 2, 3, 0], [1, 2, 3, 0]])
seq_k = torch.tensor([[1, 2, 3, 0], [1, 2, 3, 0]])
print(att_pad_mask(seq_q, seq_k))
re1 = att_pad_mask(seq_q, seq_k) #填充掩码
print(att_sub_mask(seq_q))
re2 = att_sub_mask(seq_k) #未来掩码
# 合并掩码,如果某位置为0,表示“既不是pad,也不是未来”,是可以看的;否则是要mask掉的
# gt的作用是只要不是0,就返回True,即为要遮住的地方
mask_self = torch.gt((re1 + re2), 0)
print(mask_self)
运行结果这里就不贴了,可自行运行
四、MHA 多头注意力机制
多头注意力机制(Multi-Head Attention)是 Transformer 模型中的核心结构,它的设计灵感来自人类“关注多个事物不同方面”的能力,简单来说,它的本质是:在同一个输入上,设置多个“注意力头”来并行地捕捉不同的语义特征或关系。每一个注意力头其实就是一个缩小版的“注意力机制”,它会独立地去计算输入序列中各个位置之间的依赖关系,当然最后会将每个头的结果合并起来然后返回。
实现代码如下:
'''
封装注意力机制 :多头注意力结构+交叉输入 = 多头交叉注意力机制
'''
import math
import torch
from torch import nn
#单头注意力机制 Attention类,实现注意力得分的计算
class Attention(nn.Module):
"""
注意力分数计算公式:Q * K的转置 / 根号下dk
"""
def __init__(self,dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.softmax = nn.Softmax(dim=-1)
def forward(self,q,k,v,mask=None):
# 按照公式算注意力机制
scores = torch.matmul(q,k.transpose(-1,-2))/math.sqrt(k.size(-1))
# 判断掩码
if mask is not None:
# 根据mask判断哪个值是true 然后乘以-1e9,一个超大负数
scores = scores.masked_fill_(mask, -1e9)
#注意力得分经过softmax激活函数
att = self.softmax(scores)
#拿到最终加权的实际值
output = torch.matmul(att,v)
return self.dropout(output)
#多头注意力机制 MultiHeadAttention类,分成多个头,每个头都调用Attention类,最后合并结果
class MultiHeadAttention(nn.Module):
def __init__(self,d_model,num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model//num_heads #每个头的维度
#初始化权重矩阵
self.Wq = nn.Linear(d_model,d_model)
self.Wk = nn.Linear(d_model,d_model)
self.Wv = nn.Linear(d_model,d_model)
self.Wo = nn.Linear(d_model, d_model)
self.attention = Attention()
self.dropout = nn.Dropout(0.1)
self.layer_norm = nn.LayerNorm(d_model)
def forward(self,enc_inputs,dec_inputs,mask=None):
res = dec_inputs
# 交叉输入:这里要注意 Q是来自解码器,K、V来自编码器
q = self.Wq(dec_inputs)
k = self.Wk(enc_inputs)
v = self.Wv(enc_inputs)
# 多头的实现,实际就是拆分QKV
Q = q.view(q.size(0),-1,self.num_heads,self.d_k).transpose(1,2)
K = k.view(k.size(0),-1,self.num_heads,self.d_k).transpose(1,2)
V = v.view(v.size(0),-1,self.num_heads,self.d_k).transpose(1,2)
# 处理mask的维度:[b,n,n]--->【b,h,n,n】,确保每个头都能使用
# repeat 是重复几次的方法
if mask is not None:
mask = mask.unsqueeze(1).repeat(1,self.num_heads,1,1)
# 计算注意力 形状是:[batch, heads, seq_len, d_k]
output = self.attention(Q,K,V,mask)
# 多头处理
#首先合并多个头,形状变回来:[batch, heads, seq_len, d_k] → [batch, seq_len, heads * d_k]
output = output.transpose(1,2).contiguous().view(output.size(0),-1,self.d_model)
output = self.Wo(output) #恢复为 d_model 的 shape
output = self.dropout(output)
output = self.layer_norm( output+ res) #残差连接+层归一化
return output
if __name__ == '__main__':
# 测试
# 上个代码生成的掩码
mask = [[[False, True, True, True],
[False, False, True, True],
[False, False, False, True],
[False, False, False, True]],
[[False, True, True, True],
[False, False, True, True],
[False, False, False, True],
[False, False, False, True]]]
mask = torch.tensor(mask)
mha = MultiHeadAttention(d_model=512,num_heads=8)
#输入:[batch=2, seq_len=4, d_model=512]
enc_inputs = torch.randn(2,4,512)
dec_inputs = torch.randn(2,4,512)
output = mha(enc_inputs,dec_inputs,mask)
print(output.shape) #torch.Size([2, 4, 512])
可以看到最后的数据形状还是与输入一样,只是表达效果更强了
五、FFN 前馈神经网络
前馈神经网络(Feed-Forward Neural Network, FFN)是一种基础构建模块,像一个勤劳的“信息加工厂”,通过多层线性变换和非线性激活函数,将输入数据逐层加工成更有意义的表示,一般是像个三明治一样两个线性层中间夹一层激活函数,具体代码如下:
"""
搭建FFN子层
"""
import torch
from torch import nn
#全连接前馈网络 像三明治一样两个线性层中间夹一层激活函数
class FFN(nn.Module):
def __init__(self,d_model,d_ff):
super(FFN, self).__init__()
self.ffn = nn.Sequential(
nn.Linear(d_model,d_ff), #把输入维度从 d_model(通常是512)升维到 d_ff(通常是2048)
nn.ReLU(), #激活函数
nn.Dropout(0.1),
nn.Linear(d_ff,d_model) #把维度从 d_ff 降回 d_model
)
self.dropout = nn.Dropout(0.1)
self.layer_norm = nn.LayerNorm(d_model) #层归一化
def forward(self,x):
res = x #保留原始输入
x = self.ffn(x) #传入三明治ffn网络
x = self.dropout(x)
output = self.layer_norm(x+res) #x+res 形成残差结构,再送入层归一化
return output
# 测试数据
if __name__ == '__main__':
batch_size = 2
seq_len = 4
d_model = 512
d_ff = 2048
ffn = FFN(d_model,d_ff)
x = torch.randn(batch_size,seq_len,d_model)
output = ffn(x)
print(output.shape) #torch.Size([2, 4, 512])
同样输出形状不变
因为文本与时间原因暂时写一半,剩下的下篇继续 (๑•̀ㅂ•́)و✧
以上有问题可以指出。
更多推荐
所有评论(0)