代码学习 - 数据Embedding
本文学习自Unist的论文实现,源码地址:https://github.com/tsinghua-fib-lab/UniST/tree/main,本内容为个人理解整理。参考UniST代码中Embed.py的内容,接下来详细讲解一下如何将栅格数据与时间embedding结合转变为Transformer可以接收的token。这部分Embedding主要分成三个部分:TokenEmbedding + T
文章目录
前言
本文学习自Unist的论文实现,源码地址:https://github.com/tsinghua-fib-lab/UniST/tree/main,本内容为个人理解整理。
一、Embedding和Tokenization实例介绍
参考UniST代码中Embed.py的内容,接下来详细讲解一下如何将栅格数据与时间embedding结合转变为Transformer可以接收的token。
这部分Embedding主要分成三个部分:TokenEmbedding + TemporalEmbedding + DataEmbedding。TokenEmbedding 负责将 [N,C,T,W,H] 的数值数据Tokenization为Transformer可以吸收的形式,TemporalEmbedding 负责将时间Embed为天和周两个时间层级,并进行复制匹配,让在不同空间点但是时间相同的token能够匹配上。DataEmbedding负责将TokenEmbedding的输出和TemporalEmbedding的输出相融合。
1.TokenEmbedding
模型的初始化参数:
c i n c_{in} cin:输入数据特征维度,在UniST中为1(因为是序列数据,输入数据维度为1维);
d m o d e l d_{model} dmodel:隐藏层的特征维度,在UniST中,对于不同的数据类型对应不同的值:size=1/2 → 64;size=3/4/‘middle’ →128;size=5/6/7 →256;size=‘large’ →384;
t patch_size t_{\text{patch\_size}} tpatch_size:该变量用于将3维时空张量patch分割更小的3维张量。该值用于决定分割时间维数据的大小,默认为2。
p a t c h s i z e patch_{size} patchsize:同 t patch_size t_{\text{patch\_size}} tpatch_size,只是用于分割空间维,默认为2。
模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s imgs imgs变量,是所有数据的完整序列。
这里用三维卷积进行Tokenization,其中卷积核和步长均为 [ t patch_size , p a t c h s i z e , p a t c h s i z e ] [t_{\text{patch\_size}},patch_{size}, patch_{size}] [tpatch_size,patchsize,patchsize] ,这意味着将时间维每 t patch_size t_{\text{patch\_size}} tpatch_size 帧取一个块、空间每 p a t c h s i z e × p a t c h s i z e patch_{size} × patch_{size} patchsize×patchsize 取一个块,输入为 [ N , C , T , H , W ] [N, C, T, H, W] [N,C,T,H,W]的X,每个样本维度为 [ C , T / t patch_size , H / p a t c h s i z e , W / p a t c h s i z e ] [C,T/t_{\text{patch\_size}},H/patch_{size},W/patch_{size}] [C,T/tpatch_size,H/patchsize,W/patchsize]。
卷积核和步长相同,意味着每个token是不重叠的切块,patch 边界处的信息不会共享,容易出现对边界变化不敏感。
接着利用flatten函数将空间维铺平,将 [ N , C , T , H , W ] [N, C, T, H, W] [N,C,T,H,W]转变为 [ N , C , T , H ∗ W ] [N, C, T, H*W] [N,C,T,H∗W],最后用reshape转变为 [ N , T ∗ C ∗ H ∗ W , C ] [N, T*C*H*W, C] [N,T∗C∗H∗W,C]。
Transformer实际训练时,输入为token序列(批次),输入的张量维度为: [ b a t c h s i z e , s e q l e n , d m o d e l ] [batch_{size},seq_{len},d_{model}] [batchsize,seqlen,dmodel]
经过TokenEmbedding的处理之后,输出的 x x x 变成能够直接输入到Transformer模块的形式。
代码如下:
class TokenEmbedding(nn.Module):
def __init__(self, c_in, d_model, t_patch_size, patch_size):
super(TokenEmbedding, self).__init__()
kernel_size = [t_patch_size,patch_size, patch_size]
self.tokenConv = nn.Conv3d(in_channels=c_in, out_channels=d_model,
kernel_size=kernel_size, stride=kernel_size)
for m in self.modules():
if isinstance(m, nn.Conv3d):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')
def forward(self, x):
# N, C, T, H, W = x.shape
x = self.tokenConv(x)
x = x.flatten(3)
x = torch.einsum("ncts->ntsc", x) # [N, T, H*W, C]
x = x.reshape(x.shape[0], -1, x.shape[-1]) # [N, T*C*H*W, C]
return x
nn.init.kaiming_normal_这个函数是Kaiming/He 初始化,用来给卷积核权重一个“合适尺度”的随机初值。kaiming_normal_表示从正态分布采样权重,但会控制方差,让信号在层间更稳定。mode='fan_in’表示用输入通道数(卷积核的输入连接数 fan_in)来定方差,用于让前向激活尺度稳定。nonlinearity='leaky_relu’则是告诉初始化公式你后面打算用的激活函数类型,从而用匹配 LeakyReLU 的增益系数(gain)来设定权重方差。
einsum(“ncts->ntsc”, x) 则等价于x = x.permute(0, 2, 3, 1) # n c t s -> n t s c
TokenEmbedding 这个类本质上是在做patchify + projection,不是 NLP 那种“词表切分(tokenization),只是Transformer里的输入需要一串“token”,把“每个 patch 当成一个 token,在这个语境下就顺手也叫 tokenization”
- 视觉/时空Patch embedding(常被称作“tokenization”) 和 NLP词表 tokenization 的区别
数据形态不同导致处理方式不同。
由于时空数据是连续的数值数据,因此常用切块(patchify)并通过线性层或卷积将每个局部块投影为
d_model 维向量序列;这些向量作为 Transformer 的 token 表示,整体过程可微,可端到端反向传播。
而 NLP 的词表 tokenization 需要将文本映射为有限词表上的离散 token id 序列(切分与 id 映射不可微);随后通过 embedding lookup(词向量表)把离散 id 转为连续向量序列,这一步对 embedding 参数是可微、可训练的。 - 不同数据形态常见的 embedding方法
视频/时空栅格(T×H×W):很常见用 3D 卷积做 patch embedding,因为它同时处理时间和空间分块;
纯 2D 图像(H×W):更常见的是 2D 卷积或线性层;
时间序列(T×C):常见的是 1D 卷积(沿时间切 patch)或 Linear 投影;
图数据(nodes×features):常见是 GNN 层、MLP、节点嵌入表;
轨迹(点序列 + 坐标):常用 MLP、1D conv、Transformer embedding(位置、速度、时间戳等)。
2.TemporalEmbedding
模型的初始化参数:
d m o d e l d_{model} dmodel:隐藏层的特征维度;
t patch_size t_{\text{patch\_size}} tpatch_size:该变量用于将3维时空张量patch分割更小的3维张量。该值用于决定分割时间维数据的大小,默认为2;
h o u r s i z e hour_{size} hoursize:根据采用频率不同取不同的值,如果采用频率为每小时1次,则为24;如果是半小时一次,则为48;
w e e k d a y s i z e weekday_{size} weekdaysize:因为一周7天,默认为7。
模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s m a s k imgs_{mask} imgsmask变量,是时间戳特征。形状为(B, T, 2),最后一维是[weekday, time_slot]。
首先利用 long() 对输入 x x x进行整型变换,因为Embedding索引必须是整型。然后用self.hour_embed(x[:,:,1])即nn.Embedding(hour_size, d_model)把时间槽 id 映射到 d_model 维向量。用nn.Embedding(weekday_size, d_model)把星期 id 映射到 d_model 维向量。
self.hour_embed = nn.Embedding(hour_size, d_model)建一个大小为 (hour_size, d_model) 的可学习矩阵,输入一个 hour_id(比如 13 点对应某个槽位 id),输出这行对应的 d_model 维向量。从而把离散的“时间槽编号”变成连续语义向量。self.weekday_embed同理。
“小时/星期几”这种是离散属性,不想把它当连续数值(比如 hour=23 比 hour=0 大,但语义上它们其实相邻)。通过 Embedding 让模型可以学到“哪些小时相似/哪些小时不同”的表示,不会被“数值大小”误导(避免把时间当线性变量)
然后将Embedding相加,融合“小时+星期”,再转回 ( B , T , d m o d e l ) (B, T, d_{model}) (B,T,dmodel) 。
最后再用一维卷积进行时间分块对齐,使得Embedding结果能够与主分支的时间patch划分一致,后面与TokenEmbedding的结果相加。卷积后时间长度变为 T ′ T' T′(即 T / t patch_size T/t_{\text{patch\_size}} T/tpatch_size)。输出变量形状为 ( B , T ′ , d m o d e l ) (B, T', d_{model}) (B,T′,dmodel)
代码如下:
class TemporalEmbedding(nn.Module):
def __init__(self, d_model, t_patch_size = 1, hour_size=48, weekday_size = 7):
super(TemporalEmbedding, self).__init__()
hour_size = hour_size
weekday_size = weekday_size
self.hour_embed = nn.Embedding(hour_size, d_model)
self.weekday_embed = nn.Embedding(weekday_size, d_model)
self.timeconv = nn.Conv1d(in_channels=d_model, out_channels=d_model, kernel_size=t_patch_size, stride=t_patch_size)
def forward(self, x):
x = x.long()
hour_x = self.hour_embed(x[:,:,1])
weekday_x = self.weekday_embed(x[:,:,0])
timeemb = self.timeconv(hour_x.transpose(1,2)+weekday_x.transpose(1,2)).transpose(1,2)
return timeemb
3.DataEmbedding
模型的初始化参数:
a r g s args args:整个模型的参数;
v a l u e e m b e d d i n g value_{embedding} valueembedding:用TokenEmbedding初始化得到,形状为 [ N , T ∗ C ∗ H ∗ W , C ] [N, T*C*H*W, C] [N,T∗C∗H∗W,C];
t e m p o r a l e m b e d d i n g temporal_{embedding} temporalembedding:用TemporalEmbedding初始化得到,形状为 ( B , T ′ , d m o d e l ) (B, T', d_{model}) (B,T′,dmodel);
d r o p o u t dropout dropout:dropout率。
模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s imgs imgs变量,是主序列;
x m a r k x_{mark} xmark:在UniST中,该值对应model.py中forward函数里面的 i m g s m a s k imgs_{mask} imgsmask变量,是时间戳特征;
i s t i m e is_{time} istime:当值为1,则使用时间信息;否则不使用时间信息。猜测是为了消融实验则设置的变量。
该类用于把数值序列 token和时间特征 token对齐后融合,输出给 encoder 用的输入表示。
assert TokenEmb.shape[1] == TimeEmb.shape[1] * H // self.args.patch_size * W // self.args.patch_size
TimeEmb = torch.repeat_interleave(TimeEmb, TokenEmb.shape[1]//TimeEmb.shape[1], dim=1)
这一部分表示当长度关系成立时,把每个时间步的 TimeEmb 沿序列维复制“空间patch数”次。因为TokenEmb 是展平后的时空 token,长度是 T ′ ∗ S T' * S T′∗S( S = H ′ ∗ W ′ S=H' * W' S=H′∗W′ 的空间 patch 数)。而TimeEmb 只有时间长度 T ′ T' T′,每个时间步 1 个向量。所以要把 TimeEmb复制 S S S 次,变成长度 T ′ ∗ S T' * S T′∗S,才能和 TokenEmb 一一相加。这一部分代表同一时刻下,不同的空间位置共享同一个时间Embedding(“时间语义”)。
最终返回随机失活的 x x x(如果 i s t i m e is_{time} istime为1,则为token嵌入和时间嵌入的和)和时间嵌入。经过层层处理后,输出的 x x x 包含了位置信息和时间信息,而且相比原始输入形状变为 ( B , L , d m o d e l ) (B, L, d_{model}) (B,L,dmodel),其中 L = ( T / t patch_size ∗ H / p a t c h s i z e ∗ W / p a t c h s i z e ) L = (T/t_{\text{patch\_size}}*H/patch_{size}*W/patch_{size}) L=(T/tpatch_size∗H/patchsize∗W/patchsize)。相对原始输入,时间维从 T T T 被分块到 T / t patch_size T/t_{\text{patch\_size}} T/tpatch_size,空间维从 H , W H, W H,W 被分块到 H / p a t c h s i z e , W / p a t c h s i z e H/patch_{size},W/patch_{size} H/patchsize,W/patchsize,然后得到的 ( T ′ , H ′ , W ′ ) (T', H', W') (T′,H′,W′) 三个维度被展平为序列长度 L L L。数值特征从原始值映射成 d m o d e l d_{model} dmodel 维 embedding,并与时间 embedding 相加后做 dropout。
代码如下:
class DataEmbedding(nn.Module):
def __init__(self, c_in, d_model, dropout=0.1, args=None, size1 = 48, size2=7):
super(DataEmbedding, self).__init__()
self.args = args
self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model, t_patch_size = args.t_patch_size, patch_size=args.patch_size)
self.temporal_embedding = TemporalEmbedding(t_patch_size = args.t_patch_size, d_model=d_model, hour_size = size1, weekday_size = size2)
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark, is_time=1):
'''
x: N, T, C, H, W
x_mark: N, T, D
'''
N, T, C, H, W = x.shape
TokenEmb = self.value_embedding(x)
TimeEmb = self.temporal_embedding(x_mark)
assert TokenEmb.shape[1] == TimeEmb.shape[1] * H // self.args.patch_size * W // self.args.patch_size
TimeEmb = torch.repeat_interleave(TimeEmb, TokenEmb.shape[1]//TimeEmb.shape[1], dim=1)
assert TokenEmb.shape == TimeEmb.shape
if is_time==1:
x = TokenEmb + TimeEmb
else:
x = TokenEmb
return self.dropout(x), TimeEmb
二、2D正余弦位置编码
1.get_2d_sincos_pos_embed
生成形状为[grid_size1]和[grid_size2]的grid_h和grid_w,接着利用np.meshgrid(grid_w, grid_h)
生成二维坐标网格,然后通过np.stack(grid, axis=0)把两个坐标矩阵叠起来,形状变为 [2, grid_size1, grid_size2],其中grid[0] 是一维度坐标图,grid[1] 是另一维度坐标图。利用reshape([2, 1, grid_size1, grid_size2])增加加一个维度,变 [2,1,H,W],方便后面统一处理。
最后输入get_2d_sincos_pos_embed_from_grid函数把这组坐标映射成 sin/cos 向量,输出每个位置一个 embed_dim 维向量。由此可见输出变量的形状为 [grid_size1*grid_size2, embed_dim]。
def get_2d_sincos_pos_embed(embed_dim, grid_size1, grid_size2, cls_token=False):
"""
grid_size: int of the grid height and width
return:
pos_embed: [grid_size*grid_size, embed_dim]
"""
grid_h = np.arange(grid_size1, dtype=np.float32)
grid_w = np.arange(grid_size2, dtype=np.float32)
grid = np.meshgrid(grid_w, grid_h) # here w goes first
grid = np.stack(grid, axis=0)
grid = grid.reshape([2, 1, grid_size1, grid_size2])
pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)
return pos_embed
函数get_2d_sincos_pos_embed_from_grid
这个函数的作用是把二维网格坐标 grid 编码成二维 sin/cos 位置向量。具体过程如下:
首先要确保embed_dim 是偶数,因为要一半给一个方向,一半给另一个方向。
但其实结合后面1D正余弦位置编码,这里的embed_dim应该要能被4整除,因为后面又再次要求是偶数。在实际的代码实现中,embedding的维度是64、128、256这类数值
然后调用get_1d_sincos_pos_embed_from_grid()分别对 grid[0] 和 grid[1] 这一路坐标做 1D sin/cos 编码,输出形状 (H*W, D/2)。
最后把两个方向的编码在特征维拼接,得到 (H*W, D)。
返回 emb ,每个空间位置一个 D 维向量。
def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):
assert embed_dim % 2 == 0
# use half of dimensions to encode grid_h
emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2)
emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2)
emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D)
return emb
函数get_1d_sincos_pos_embed_from_grid
这个函数是在做标准的 1D SinCos 位置编码,给每个位置 pos 生成一个 D 维向量。
公式如下: P E ( p o s , 2 i ) = s i n ( p o s 10000 ( 2 i / D ) ) , P E ( p o s , 2 i + 1 ) = c o s ( p o s 10000 ( 2 i / D ) ) PE(pos, 2i)=sin(\frac{pos}{10000^{(2i/D)}}),\ \ \ PE(pos, 2i+1)=cos(\frac{pos}{10000^{(2i/D)}}) PE(pos,2i)=sin(10000(2i/D)pos), PE(pos,2i+1)=cos(10000(2i/D)pos)
其中 i i i∈ [ 0 , D − 1 2 ] [0,\frac{D-1}{2}] [0,2D−1]
具体过程如下:
首先确保D维度为偶数,要一半维度放 sin,一半放 cos。
接着构造频率 omega,按照 D 2 \frac{D}{2} 2D的长度构造向量,得到从高频到低频的一组频率系数。
然后利用pos.reshape展平网格,变成长度为[H’*W’]的向量。
将位置和频率进行外积运算得到相位矩阵:np.einsum(“m,d->md”, pos, omega) ,这里第 m 个位置和第 d 个频率相乘,得到该频率下的相位。然后分别取 sin/cos,这里的输出暂且只是(H’*W’, D/2),最后要拼接成最终编码 (H’*W’, D),每个位置最终是一个 D 维向量。
omega 频率是按对数间隔(log-spaced)分布,不是线性分布。低维通道高频(变化快),高维通道低频(变化慢)。这样同时覆盖“短距离”和“长距离”位置信息。此外,数值10000只是经典 Transformer 的经验基数,不是唯一可选值。
可以看到隐藏层维度在这里产生了明显影响,embed_dim 越大,omega 里的频率点越多,位置编码能表达的“尺度”更细。
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):
"""
embed_dim: output dimension for each position
pos: a list of positions to be encoded: size (M,)
out: (M, D)
"""
assert embed_dim % 2 == 0
omega = np.arange(embed_dim // 2, dtype=np.float32)
omega /= embed_dim / 2.0
omega = 1.0 / 10000**omega # (D/2,)
pos = pos.reshape(-1) # (M,)
out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product
emb_sin = np.sin(out) # (M, D/2)
emb_cos = np.cos(out) # (M, D/2)
emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D)
return emb
更多推荐


所有评论(0)