前言

本文学习自Unist的论文实现,源码地址:https://github.com/tsinghua-fib-lab/UniST/tree/main,本内容为个人理解整理。


一、Embedding和Tokenization实例介绍

参考UniST代码中Embed.py的内容,接下来详细讲解一下如何将栅格数据时间embedding结合转变为Transformer可以接收的token。

这部分Embedding主要分成三个部分:TokenEmbedding + TemporalEmbedding + DataEmbedding。TokenEmbedding 负责将 [N,C,T,W,H] 的数值数据Tokenization为Transformer可以吸收的形式,TemporalEmbedding 负责将时间Embed为两个时间层级,并进行复制匹配,让在不同空间点但是时间相同的token能够匹配上。DataEmbedding负责将TokenEmbedding的输出和TemporalEmbedding的输出相融合。

1.TokenEmbedding

模型的初始化参数:
c i n c_{in} cin:输入数据特征维度,在UniST中为1(因为是序列数据,输入数据维度为1维);
d m o d e l d_{model} dmodel:隐藏层的特征维度,在UniST中,对于不同的数据类型对应不同的值:size=1/2 → 64;size=3/4/‘middle’ →128;size=5/6/7 →256;size=‘large’ →384;
t patch_size t_{\text{patch\_size}} tpatch_size:该变量用于将3维时空张量patch分割更小的3维张量。该值用于决定分割时间维数据的大小,默认为2。
p a t c h s i z e patch_{size} patchsize:同 t patch_size t_{\text{patch\_size}} tpatch_size,只是用于分割空间维,默认为2。

模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s imgs imgs变量,是所有数据的完整序列。

这里用三维卷积进行Tokenization,其中卷积核和步长均为 [ t patch_size , p a t c h s i z e , p a t c h s i z e ] [t_{\text{patch\_size}},patch_{size}, patch_{size}] [tpatch_size,patchsize,patchsize] ,这意味着将时间维每 t patch_size t_{\text{patch\_size}} tpatch_size 帧取一个块、空间每 p a t c h s i z e × p a t c h s i z e patch_{size} × patch_{size} patchsize×patchsize 取一个块,输入为 [ N , C , T , H , W ] [N, C, T, H, W] [N,C,T,H,W]的X,每个样本维度为 [ C , T / t patch_size , H / p a t c h s i z e , W / p a t c h s i z e ] [C,T/t_{\text{patch\_size}},H/patch_{size},W/patch_{size}] [C,T/tpatch_size,H/patchsize,W/patchsize]

卷积核和步长相同,意味着每个token是不重叠的切块,patch 边界处的信息不会共享,容易出现对边界变化不敏感。

接着利用flatten函数将空间维铺平,将 [ N , C , T , H , W ] [N, C, T, H, W] [N,C,T,H,W]转变为 [ N , C , T , H ∗ W ] [N, C, T, H*W] [N,C,T,HW],最后用reshape转变为 [ N , T ∗ C ∗ H ∗ W , C ] [N, T*C*H*W, C] [N,TCHW,C]

Transformer实际训练时,输入为token序列(批次),输入的张量维度为: [ b a t c h s i z e , s e q l e n , d m o d e l ​ ] [batch_{size},seq_{len},d_{model}​] [batchsize,seqlen,dmodel]

经过TokenEmbedding的处理之后,输出的 x x x 变成能够直接输入到Transformer模块的形式。

代码如下:

class TokenEmbedding(nn.Module):
    def __init__(self, c_in, d_model, t_patch_size, patch_size):
        super(TokenEmbedding, self).__init__()
        kernel_size = [t_patch_size,patch_size, patch_size]
        self.tokenConv = nn.Conv3d(in_channels=c_in, out_channels=d_model,
                                   kernel_size=kernel_size, stride=kernel_size)
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        # N, C, T, H, W = x.shape
        x = self.tokenConv(x)
        x = x.flatten(3)
        x = torch.einsum("ncts->ntsc", x)  # [N, T, H*W, C]
        x = x.reshape(x.shape[0], -1, x.shape[-1])  # [N, T*C*H*W, C]
        return x

nn.init.kaiming_normal_这个函数是Kaiming/He 初始化,用来给卷积核权重一个“合适尺度”的随机初值。kaiming_normal_表示从正态分布采样权重,但会控制方差,让信号在层间更稳定。mode='fan_in’表示用输入通道数(卷积核的输入连接数 fan_in)来定方差,用于让前向激活尺度稳定。nonlinearity='leaky_relu’则是告诉初始化公式你后面打算用的激活函数类型,从而用匹配 LeakyReLU 的增益系数(gain)来设定权重方差。

einsum(“ncts->ntsc”, x) 则等价于x = x.permute(0, 2, 3, 1) # n c t s -> n t s c

TokenEmbedding 这个类本质上是在做patchify + projection,不是 NLP 那种“词表切分(tokenization),只是Transformer里的输入需要一串“token”,把“每个 patch 当成一个 token,在这个语境下就顺手也叫 tokenization”

  • 视觉/时空Patch embedding(常被称作“tokenization”) 和 NLP词表 tokenization 的区别
    数据形态不同导致处理方式不同。
    由于时空数据是连续的数值数据,因此常用切块(patchify)并通过线性层或卷积将每个局部块投影为
    d_model 维向量序列;这些向量作为 Transformer 的 token 表示,整体过程可微,可端到端反向传播。
    而 NLP 的词表 tokenization 需要将文本映射为有限词表上的离散 token id 序列(切分与 id 映射不可微);随后通过 embedding lookup(词向量表)把离散 id 转为连续向量序列,这一步对 embedding 参数是可微、可训练的。
  • 不同数据形态常见的 embedding方法
    视频/时空栅格(T×H×W):很常见用 3D 卷积做 patch embedding,因为它同时处理时间和空间分块;
    纯 2D 图像(H×W):更常见的是 2D 卷积或线性层;
    时间序列(T×C):常见的是 1D 卷积(沿时间切 patch)或 Linear 投影;
    图数据(nodes×features):常见是 GNN 层、MLP、节点嵌入表;
    轨迹(点序列 + 坐标):常用 MLP、1D conv、Transformer embedding(位置、速度、时间戳等)。

2.TemporalEmbedding

模型的初始化参数:
d m o d e l d_{model} dmodel:隐藏层的特征维度;
t patch_size t_{\text{patch\_size}} tpatch_size:该变量用于将3维时空张量patch分割更小的3维张量。该值用于决定分割时间维数据的大小,默认为2;
h o u r s i z e hour_{size} hoursize:根据采用频率不同取不同的值,如果采用频率为每小时1次,则为24;如果是半小时一次,则为48;
w e e k d a y s i z e weekday_{size} weekdaysize:因为一周7天,默认为7。

模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s m a s k imgs_{mask} imgsmask变量,是时间戳特征。形状为(B, T, 2),最后一维是[weekday, time_slot]。

首先利用 long() 对输入 x x x进行整型变换,因为Embedding索引必须是整型。然后用self.hour_embed(x[:,:,1])即nn.Embedding(hour_size, d_model)把时间槽 id 映射到 d_model 维向量。用nn.Embedding(weekday_size, d_model)把星期 id 映射到 d_model 维向量

self.hour_embed = nn.Embedding(hour_size, d_model)建一个大小为 (hour_size, d_model) 的可学习矩阵输入一个 hour_id(比如 13 点对应某个槽位 id),输出这行对应的 d_model 维向量。从而把离散的“时间槽编号”变成连续语义向量。self.weekday_embed同理。

“小时/星期几”这种是离散属性,不想把它当连续数值(比如 hour=23 比 hour=0 大,但语义上它们其实相邻)。通过 Embedding 让模型可以学到“哪些小时相似/哪些小时不同”的表示,不会被“数值大小”误导(避免把时间当线性变量)

然后将Embedding相加,融合“小时+星期”,再转回 ( B , T , d m o d e l ) (B, T, d_{model}) (B,T,dmodel)

最后再用一维卷积进行时间分块对齐,使得Embedding结果能够与主分支的时间patch划分一致,后面与TokenEmbedding的结果相加。卷积后时间长度变为 T ′ T' T(即 T / t patch_size T/t_{\text{patch\_size}} T/tpatch_size)。输出变量形状为 ( B , T ′ , d m o d e l ) (B, T', d_{model}) (B,T,dmodel)

代码如下:

class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, t_patch_size = 1, hour_size=48, weekday_size = 7):
        super(TemporalEmbedding, self).__init__()

        hour_size = hour_size
        weekday_size = weekday_size

        self.hour_embed = nn.Embedding(hour_size, d_model)
        self.weekday_embed = nn.Embedding(weekday_size, d_model)
        self.timeconv = nn.Conv1d(in_channels=d_model, out_channels=d_model, kernel_size=t_patch_size, stride=t_patch_size)

    def forward(self, x):
        x = x.long()
        hour_x = self.hour_embed(x[:,:,1])
        weekday_x = self.weekday_embed(x[:,:,0])
        timeemb = self.timeconv(hour_x.transpose(1,2)+weekday_x.transpose(1,2)).transpose(1,2)
        return timeemb

3.DataEmbedding

模型的初始化参数:
a r g s args args:整个模型的参数;
v a l u e e m b e d d i n g value_{embedding} valueembedding:用TokenEmbedding初始化得到,形状为 [ N , T ∗ C ∗ H ∗ W , C ] [N, T*C*H*W, C] [N,TCHW,C]
t e m p o r a l e m b e d d i n g temporal_{embedding} temporalembedding:用TemporalEmbedding初始化得到,形状为 ( B , T ′ , d m o d e l ) (B, T', d_{model}) (B,T,dmodel)
d r o p o u t dropout dropout:dropout率。

模型的前向参数:
x x x:在UniST中,该值对应model.py中forward函数里面的 i m g s imgs imgs变量,是主序列;
x m a r k x_{mark} xmark:在UniST中,该值对应model.py中forward函数里面的 i m g s m a s k imgs_{mask} imgsmask变量,是时间戳特征;
i s t i m e is_{time} istime:当值为1,则使用时间信息;否则不使用时间信息。猜测是为了消融实验则设置的变量。

该类用于把数值序列 token和时间特征 token对齐后融合,输出给 encoder 用的输入表示。

assert TokenEmb.shape[1] == TimeEmb.shape[1] * H // self.args.patch_size * W // self.args.patch_size
TimeEmb = torch.repeat_interleave(TimeEmb, TokenEmb.shape[1]//TimeEmb.shape[1], dim=1)

这一部分表示当长度关系成立时,把每个时间步的 TimeEmb 沿序列维复制“空间patch数”次。因为TokenEmb 是展平后的时空 token,长度是 T ′ ∗ S T' * S TS S = H ′ ∗ W ′ S=H' * W' S=HW 的空间 patch 数)。而TimeEmb 只有时间长度 T ′ T' T,每个时间步 1 个向量。所以要把 TimeEmb复制 S S S 次,变成长度 T ′ ∗ S T' * S TS,才能和 TokenEmb 一一相加。这一部分代表同一时刻下,不同的空间位置共享同一个时间Embedding(“时间语义”)。

最终返回随机失活的 x x x(如果 i s t i m e is_{time} istime为1,则为token嵌入和时间嵌入的和)和时间嵌入。经过层层处理后,输出的 x x x 包含了位置信息和时间信息,而且相比原始输入形状变为 ( B , L , d m o d e l ) (B, L, d_{model}) (B,L,dmodel),其中 L = ( T / t patch_size ∗ H / p a t c h s i z e ∗ W / p a t c h s i z e ) L = (T/t_{\text{patch\_size}}*H/patch_{size}*W/patch_{size}) L=(T/tpatch_sizeH/patchsizeW/patchsize)。相对原始输入,时间维从 T T T 被分块到 T / t patch_size T/t_{\text{patch\_size}} T/tpatch_size,空间维从 H , W H, W H,W 被分块到 H / p a t c h s i z e , W / p a t c h s i z e H/patch_{size},W/patch_{size} H/patchsize,W/patchsize,然后得到的 ( T ′ , H ′ , W ′ ) (T', H', W') (T,H,W) 三个维度被展平为序列长度 L L L。数值特征从原始值映射成 d m o d e l d_{model} dmodel 维 embedding,并与时间 embedding 相加后做 dropout。

代码如下:

class DataEmbedding(nn.Module):
    def __init__(self, c_in, d_model, dropout=0.1, args=None, size1 = 48, size2=7):
        super(DataEmbedding, self).__init__()
        self.args = args
        self.value_embedding = TokenEmbedding(c_in=c_in, d_model=d_model, t_patch_size = args.t_patch_size,  patch_size=args.patch_size)
        self.temporal_embedding = TemporalEmbedding(t_patch_size = args.t_patch_size, d_model=d_model, hour_size  = size1, weekday_size = size2) 
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark, is_time=1):
        '''
        x: N, T, C, H, W
        x_mark: N, T, D
        '''
        N, T, C, H, W = x.shape
        TokenEmb = self.value_embedding(x)
        TimeEmb = self.temporal_embedding(x_mark)
        assert TokenEmb.shape[1] == TimeEmb.shape[1] * H // self.args.patch_size * W // self.args.patch_size
        TimeEmb = torch.repeat_interleave(TimeEmb, TokenEmb.shape[1]//TimeEmb.shape[1], dim=1)
        assert TokenEmb.shape == TimeEmb.shape
        if is_time==1:
            x = TokenEmb + TimeEmb
        else:
            x = TokenEmb
        return self.dropout(x), TimeEmb

二、2D正余弦位置编码

1.get_2d_sincos_pos_embed

生成形状为[grid_size1]和[grid_size2]的grid_h和grid_w,接着利用np.meshgrid(grid_w, grid_h)
生成二维坐标网格,然后通过np.stack(grid, axis=0)把两个坐标矩阵叠起来,形状变为 [2, grid_size1, grid_size2],其中grid[0] 是一维度坐标图,grid[1] 是另一维度坐标图。利用reshape([2, 1, grid_size1, grid_size2])增加加一个维度,变 [2,1,H,W],方便后面统一处理。

最后输入get_2d_sincos_pos_embed_from_grid函数把这组坐标映射成 sin/cos 向量,输出每个位置一个 embed_dim 维向量。由此可见输出变量的形状为 [grid_size1*grid_size2, embed_dim]。

def get_2d_sincos_pos_embed(embed_dim, grid_size1, grid_size2, cls_token=False):
    """
    grid_size: int of the grid height and width
    return:
    pos_embed: [grid_size*grid_size, embed_dim]
    """
    grid_h = np.arange(grid_size1, dtype=np.float32)
    grid_w = np.arange(grid_size2, dtype=np.float32)
    grid = np.meshgrid(grid_w, grid_h)  # here w goes first
    grid = np.stack(grid, axis=0)

    grid = grid.reshape([2, 1, grid_size1, grid_size2])
    pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)
    return pos_embed

函数get_2d_sincos_pos_embed_from_grid

这个函数的作用是把二维网格坐标 grid 编码成二维 sin/cos 位置向量。具体过程如下:

首先要确保embed_dim 是偶数,因为要一半给一个方向,一半给另一个方向。

但其实结合后面1D正余弦位置编码,这里的embed_dim应该要能被4整除,因为后面又再次要求是偶数。在实际的代码实现中,embedding的维度是64、128、256这类数值

然后调用get_1d_sincos_pos_embed_from_grid()分别对 grid[0] 和 grid[1] 这一路坐标做 1D sin/cos 编码,输出形状 (H*W, D/2)。

最后把两个方向的编码在特征维拼接,得到 (H*W, D)。

返回 emb ,每个空间位置一个 D 维向量。

def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):
    assert embed_dim % 2 == 0

    # use half of dimensions to encode grid_h
    emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0])  # (H*W, D/2)
    emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1])  # (H*W, D/2)

    emb = np.concatenate([emb_h, emb_w], axis=1)  # (H*W, D)
    return emb

函数get_1d_sincos_pos_embed_from_grid

这个函数是在做标准的 1D SinCos 位置编码,给每个位置 pos 生成一个 D 维向量。

公式如下: P E ( p o s , 2 i ) = s i n ( p o s 10000 ( 2 i / D ) ) ,     P E ( p o s , 2 i + 1 ) = c o s ( p o s 10000 ( 2 i / D ) ) PE(pos, 2i)=sin(\frac{pos}{10000^{(2i/D)}}),\ \ \ PE(pos, 2i+1)=cos(\frac{pos}{10000^{(2i/D)}}) PE(pos,2i)=sin(10000(2i/D)pos),   PE(pos,2i+1)=cos(10000(2i/D)pos)
其中 i i i [ 0 , D − 1 2 ] [0,\frac{D-1}{2}] [0,2D1]

具体过程如下:

首先确保D维度为偶数,要一半维度放 sin,一半放 cos。

接着构造频率 omega,按照 D 2 \frac{D}{2} 2D的长度构造向量,得到从高频到低频的一组频率系数。

然后利用pos.reshape展平网格,变成长度为[H’*W’]的向量。

将位置和频率进行外积运算得到相位矩阵:np.einsum(“m,d->md”, pos, omega) ,这里第 m 个位置和第 d 个频率相乘,得到该频率下的相位。然后分别取 sin/cos,这里的输出暂且只是(H’*W’, D/2),最后要拼接成最终编码 (H’*W’, D),每个位置最终是一个 D 维向量。

omega 频率是按对数间隔(log-spaced)分布,不是线性分布。低维通道高频(变化快),高维通道低频(变化慢)。这样同时覆盖“短距离”和“长距离”位置信息。此外,数值10000只是经典 Transformer 的经验基数,不是唯一可选值。

可以看到隐藏层维度在这里产生了明显影响,embed_dim 越大,omega 里的频率点越多,位置编码能表达的“尺度”更细。

def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):
    """
    embed_dim: output dimension for each position
    pos: a list of positions to be encoded: size (M,)
    out: (M, D)
    """
    assert embed_dim % 2 == 0
    omega = np.arange(embed_dim // 2, dtype=np.float32)
    omega /= embed_dim / 2.0
    omega = 1.0 / 10000**omega  # (D/2,)

    pos = pos.reshape(-1)  # (M,)
    out = np.einsum("m,d->md", pos, omega)  # (M, D/2), outer product

    emb_sin = np.sin(out)  # (M, D/2)
    emb_cos = np.cos(out)  # (M, D/2)

    emb = np.concatenate([emb_sin, emb_cos], axis=1)  # (M, D)
    return emb
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐