《Transformer实战:AI架构师手把手教你搭建智能数字资产追踪系统》

引言:金融领域的"数据追踪困境"与Transformer的破局

你是否遇到过这样的场景?
作为金融分析师,你盯着屏幕上跳动的加密货币价格曲线,想找出比特币和以太坊的联动规律,却被24小时无休的交易数据淹没;作为量化交易者,你尝试用LSTM预测NFT地板价,却发现模型总是漏掉"某明星买入"这类长周期事件的影响;作为资产管理者,你想快速识别异常交易(比如大额洗币),却被传统时间序列模型的"短视"限制——它只能看到最近7天的数据。

数字资产追踪的核心痛点,在于长距离时间依赖(比如3个月前的政策事件对今日价格的影响)和多资产交叉关联(比如比特币暴跌引发的 altcoin 瀑布效应)。而传统的时间序列模型(ARIMA、LSTM)要么无法捕捉长周期依赖,要么难以并行处理多资产的复杂关系。

Transformer的出现,为解决这些问题提供了钥匙

  • 自注意力机制(Self-Attention)能同时关注"3个月前的政策"和"昨天的交易数据",捕捉长距离依赖;
  • 多头注意力(Multi-Head Attention)能并行分析多资产间的关联,比如比特币与以太坊的价格联动、NFT与底层区块链的关系;
  • 灵活的编码器-解码器结构,能同时支持趋势预测异常检测关联分析等多任务。

本文将带你做什么?
我们会从零开始,用Transformer搭建一个端到端的智能数字资产追踪系统,覆盖数据获取→预处理→模型设计→训练优化→功能落地→系统部署全流程。

读完本文你能获得什么?

  • 掌握Transformer在金融时间序列场景的落地方法;
  • 实现"趋势预测+异常检测+关联分析"三大核心功能;
  • 学会将模型封装为API,快速部署到生产环境。

准备工作:你需要的技术储备与环境

1. 技术栈要求

  • 基础能力:Python编程、Pandas/NumPy数据处理、PyTorch基本使用;
  • 机器学习基础:了解时间序列分析(比如滑动窗口)、损失函数(MSE/MAE)、优化器(AdamW);
  • Transformer基础:知道自注意力、位置编码、编码器-解码器的基本概念(不需要深入推导)。

2. 环境与工具

  • Python版本:3.8+(避免兼容性问题);
  • 必备库:PyTorch(1.10+)、Pandas、NumPy、Matplotlib、Scikit-learn、FastAPI、Transformers(可选,用于多模态);
  • 数据来源:CoinGecko API(免费获取加密货币历史数据)、Kaggle(加密货币/NFT数据集);
  • 部署工具:Uvicorn(运行FastAPI服务)、Docker(可选,容器化部署)。

3. 快速环境搭建

condapip安装依赖:

# 安装PyTorch(根据CUDA版本调整,无GPU用cpu版)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 安装其他库
pip install pandas numpy matplotlib scikit-learn fastapi uvicorn pycoingecko joblib transformers

核心实战:从0到1搭建智能追踪系统

步骤一:问题定义与数据准备

数字资产追踪的核心任务,我们聚焦3个高频需求:

  1. 趋势预测:给定过去30天的价格/交易量数据,预测未来1天的资产价格;
  2. 异常检测:识别异常交易(比如大额抛售、洗币),用"重建误差"标记异常点;
  3. 关联分析:分析多资产间的关联(比如比特币与以太坊的价格联动),用注意力权重可视化。
1.1 数据获取:用CoinGecko API拿加密货币数据

我们以**比特币(bitcoin)以太坊(ethereum)**为例,获取2020-2023年的日度数据(Open/High/Low/Close/Volume):

import pandas as pd
from pycoingecko import CoinGeckoAPI

cg = CoinGeckoAPI()

def get_crypto_data(coin_ids, start_date, end_date):
    """获取多个加密货币的历史数据(价格+交易量)"""
    data = {}
    for coin_id in coin_ids:
        # 调用API获取历史数据(时间戳+价格+交易量)
        history = cg.get_coin_market_chart_range_by_id(
            id=coin_id,
            vs_currency="usd",
            from_timestamp=pd.Timestamp(start_date).timestamp(),
            to_timestamp=pd.Timestamp(end_date).timestamp()
        )
        # 转换为DataFrame
        price_df = pd.DataFrame(history["prices"], columns=["timestamp", "price"])
        volume_df = pd.DataFrame(history["total_volumes"], columns=["timestamp", "volume"])
        # 合并价格与交易量,按时间戳索引
        df = pd.merge(price_df, volume_df, on="timestamp")
        df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
        df.set_index("timestamp", inplace=True)
        data[coin_id] = df
    # 合并多资产数据为宽表(列名:coin_id_feature)
    combined_df = pd.concat(data.values(), axis=1, keys=data.keys())
    combined_df.columns = ["_".join(col) for col in combined_df.columns]
    return combined_df

# 示例:获取比特币和以太坊的2020-2023年数据
coin_ids = ["bitcoin", "ethereum"]
start_date = "2020-01-01"
end_date = "2023-01-01"
raw_df = get_crypto_data(coin_ids, start_date, end_date)
print(raw_df.head())

输出结果(部分):

timestamp bitcoin_price bitcoin_volume ethereum_price ethereum_volume
2020-01-01 00:00:00 7194.89 14245678901 130.04 4567890123
2020-01-02 00:00:00 7200.12 13890123456 132.56 4321098765
1.2 数据预处理:从原始数据到模型输入

金融数据的"脏"是出了名的——缺失值、异常值、量纲不一致。我们需要做3件事:

(1)缺失值处理

用"前向填充"(ffill)填补缺失值(假设今天的价格与昨天相关):

raw_df.fillna(method="ffill", inplace=True)
(2)特征工程:增加有效特征

仅用价格和交易量不够,我们需要加入技术指标(金融分析师常用的信号):

  • 移动平均线(MA7/MA30):平滑价格曲线,反映趋势;
  • 相对强弱指数(RSI):衡量资产的超买/超卖状态(范围0-100,>70超买,<30超卖)。
def add_technical_indicators(df, coin_ids):
    """为每个资产添加技术指标"""
    for coin_id in coin_ids:
        price_col = f"{coin_id}_price"
        volume_col = f"{coin_id}_volume"
        
        # 7天/30天移动平均线
        df[f"{coin_id}_ma7"] = df[price_col].rolling(window=7).mean()
        df[f"{coin_id}_ma30"] = df[price_col].rolling(window=30).mean()
        
        # 相对强弱指数(RSI)
        delta = df[price_col].diff(1)  # 价格变化
        gain = delta.where(delta > 0, 0)  # 上涨幅度
        loss = -delta.where(delta < 0, 0)  # 下跌幅度
        avg_gain = gain.rolling(window=14).mean()  # 14天平均上涨
        avg_loss = loss.rolling(window=14).mean()  # 14天平均下跌
        rs = avg_gain / avg_loss
        df[f"{coin_id}_rsi"] = 100 - (100 / (1 + rs))
    return df

# 添加技术指标
processed_df = add_technical_indicators(raw_df, coin_ids)
# 去除计算指标产生的NaN行(前30天无MA30,前14天无RSI)
processed_df.dropna(inplace=True)
(3)归一化:消除量纲影响

金融数据的量纲差异大(比如比特币价格是万美元级,交易量是十亿级),需要用MinMaxScaler将数据缩放到[0,1]区间:

from sklearn.preprocessing import MinMaxScaler
import joblib

# 初始化归一化器
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(processed_df)

# 保存归一化器(后续反归一化需要)
joblib.dump(scaler, "scaler.pkl")
(4)构建时间序列:滑动窗口法

Transformer需要序列输入(比如过去30天的数据预测未来1天)。我们用滑动窗口法生成训练数据:

import numpy as np

def create_time_sequences(data, seq_len, output_len):
    """
    将时间序列转换为输入-输出对
    :param data: 归一化后的特征矩阵(shape: [n_samples, n_features])
    :param seq_len: 输入窗口长度(比如30天)
    :param output_len: 输出窗口长度(比如1天)
    :return: X (shape: [n_samples, seq_len, n_features]), y (shape: [n_samples, output_len, n_assets])
    """
    X, y = [], []
    n_samples = len(data) - seq_len - output_len + 1
    # 遍历所有可能的窗口
    for i in range(n_samples):
        # 输入:前seq_len个时间步的所有特征
        X.append(data[i:i+seq_len])
        # 输出:接下来output_len个时间步的"价格"特征(仅预测价格)
        price_cols = [processed_df.columns.get_loc(f"{coin}_price") for coin in coin_ids]
        y.append(data[i+seq_len:i+seq_len+output_len, price_cols])
    return np.array(X), np.array(y)

# 设定窗口参数
seq_len = 30  # 输入:过去30天
output_len = 1  # 输出:未来1天
X, y = create_time_sequences(scaled_data, seq_len, output_len)

# 划分训练/验证/测试集(时间序列必须按顺序划分,不能随机!)
train_ratio = 0.7
val_ratio = 0.2
train_size = int(train_ratio * len(X))
val_size = int(val_ratio * len(X))

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test = X[train_size+val_size:], y[train_size+val_size:]

# 转换为PyTorch张量
import torch
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)

print(f"训练集形状:X_train={X_train.shape}, y_train={y_train.shape}")
# 输出:训练集形状:X_train=(584, 30, 10), y_train=(584, 1, 2)
# 解释:584个样本,每个样本输入30天×10个特征(2资产×5特征:price/volume/ma7/ma30/rsi),输出1天×2资产价格

步骤二:Transformer模型设计——适配金融时间序列

Transformer的核心是自注意力机制,但直接用论文中的Transformer并不适配金融场景——我们需要做3点调整:

  1. 输入嵌入:将高维特征(比如10维)映射到Transformer的隐藏维度(d_model);
  2. 位置编码:金融数据是时间序列,需要手动加入位置信息(用可学习的位置编码,比固定编码更灵活);
  3. 输出头:针对"多资产预测"任务,输出层需要对应资产数量(比如2个资产,输出2维)。
2.1 模型结构定义(PyTorch)
import torch.nn as nn

class TransformerAssetTracker(nn.Module):
    def __init__(
        self,
        input_dim: int,       # 输入特征维度(比如10)
        d_model: int,         # Transformer隐藏维度(比如64)
        nhead: int,           # 多头注意力头数(必须整除d_model,比如4)
        num_encoder_layers: int,  # 编码器层数(比如3)
        dim_feedforward: int,     # 前馈神经网络隐藏维度(比如256)
        output_dim: int,      # 输出维度(比如2:2个资产)
        seq_len: int,         # 输入窗口长度(比如30)
        dropout: float = 0.1  # Dropout率
    ):
        super().__init__()
        self.d_model = d_model
        
        # 1. 输入嵌入:将输入特征映射到d_model维度
        self.embedding = nn.Linear(input_dim, d_model)
        
        # 2. 可学习的位置编码:(1, seq_len, d_model),适配所有 batch
        self.pos_encoder = nn.Parameter(torch.randn(1, seq_len, d_model))
        
        # 3. Transformer编码器层(带残差连接+层归一化)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True  # 输入形状:(batch_size, seq_len, d_model)(必须开!)
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        
        # 4. 输出层:将编码器输出映射到目标维度
        self.fc = nn.Linear(d_model * seq_len, output_dim)  # 展平所有时间步的特征
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        前向传播
        :param x: 输入张量(shape: [batch_size, seq_len, input_dim])
        :return: 输出张量(shape: [batch_size, output_len, output_dim])
        """
        batch_size = x.size(0)
        
        # 输入嵌入 + 位置编码
        x = self.embedding(x)  # (batch_size, seq_len, d_model)
        x = x + self.pos_encoder  # 加入位置信息
        x = self.dropout(x)
        
        # Transformer编码器
        encoder_out = self.transformer_encoder(x)  # (batch_size, seq_len, d_model)
        
        # 展平编码器输出(seq_len × d_model → 1维)
        encoder_out_flat = encoder_out.view(batch_size, -1)  # (batch_size, seq_len × d_model)
        
        # 输出层:映射到目标维度
        out = self.fc(encoder_out_flat)  # (batch_size, output_dim)
        
        # 调整形状为(batch_size, output_len, output_dim)(适配y的形状)
        out = out.view(batch_size, output_len, -1)
        return out
2.2 模型初始化

根据我们的数据参数,初始化模型:

# 模型参数
input_dim = X_train.shape[2]  # 10(2资产×5特征)
d_model = 64                  # 隐藏维度(可调整,比如32/128)
nhead = 4                     # 多头数(64÷4=16,必须整除)
num_encoder_layers = 3        # 编码器层数(可调整,比如2/4)
dim_feedforward = 256         # 前馈隐藏维度(可调整,比如128/512)
output_dim = len(coin_ids)    # 2(比特币+以太坊)

# 创建模型
model = TransformerAssetTracker(
    input_dim=input_dim,
    d_model=d_model,
    nhead=nhead,
    num_encoder_layers=num_encoder_layers,
    dim_feedforward=dim_feedforward,
    output_dim=output_dim,
    seq_len=seq_len,
    dropout=0.1
)

# 移动模型到GPU(如果有)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(model)

步骤三:模型训练与优化——解决过拟合与梯度爆炸

金融数据的噪声大非平稳性(比如牛市/熊市的分布差异),训练时容易过拟合。我们需要用以下技巧:

3.1 损失函数与优化器
  • 损失函数:回归任务用MSE(均方误差)(惩罚大误差);
  • 优化器:用AdamW(带权重衰减的Adam,缓解过拟合);
  • 学习率调度:用余弦退火(逐渐降低学习率,避免后期震荡)。
import torch.optim as optim

# 损失函数
criterion = nn.MSELoss()

# 优化器(权重衰减=1e-5,缓解过拟合)
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)

# 学习率调度(余弦退火,T_max=100:100个epoch后学习率降到最小值)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
3.2 数据加载器(避免内存爆炸)

用PyTorch的DataLoader批量加载数据,避免一次性加载全部数据:

from torch.utils.data import DataLoader, TensorDataset

# 训练集DataLoader(时间序列不能shuffle!)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)

# 验证集DataLoader
val_dataset = TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
3.3 训练循环(带早停机制)

早停(Early Stopping):当验证集损失连续多轮不下降时,停止训练,避免过拟合。

from tqdm import tqdm

# 训练参数
epochs = 200
patience = 20  # 连续20轮验证损失不下降则停止
best_val_loss = float("inf")
patience_counter = 0

# 训练循环
for epoch in range(epochs):
    # 1. 训练模式
    model.train()
    train_loss = 0.0
    for batch_X, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
        # 移动数据到GPU
        batch_X = batch_X.to(device)
        batch_y = batch_y.to(device)
        
        # 梯度清零
        optimizer.zero_grad()
        
        # 前向传播
        outputs = model(batch_X)
        
        # 计算损失
        loss = criterion(outputs, batch_y)
        
        # 反向传播+梯度裁剪(防止梯度爆炸)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)  # 梯度裁剪到1以内
        
        # 更新参数
        optimizer.step()
        
        # 累加训练损失
        train_loss += loss.item() * batch_X.size(0)
    
    # 计算平均训练损失
    train_loss /= len(train_loader.dataset)
    
    # 2. 验证模式(关闭Dropout)
    model.eval()
    val_loss = 0.0
    with torch.no_grad():  # 不计算梯度
        for batch_X, batch_y in val_loader:
            batch_X = batch_X.to(device)
            batch_y = batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            val_loss += loss.item() * batch_X.size(0)
    
    # 计算平均验证损失
    val_loss /= len(val_loader.dataset)
    
    # 3. 学习率调度
    scheduler.step()
    
    # 打印日志
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
    
    # 4. 早停检查
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        # 保存最佳模型
        torch.save(model.state_dict(), "best_transformer_model.pth")
        print("→ 保存最佳模型!")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("→ 早停触发!")
            break

步骤四:核心功能实现——从模型到业务价值

训练好模型后,我们需要将其转化为可落地的功能:趋势预测、异常检测、关联分析。

4.1 功能1:趋势预测——预测未来1天价格

预测的关键是反归一化(将模型输出的[0,1]值转换为真实价格)。

import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, root_mean_squared_error

# 加载最佳模型和归一化器
model.load_state_dict(torch.load("best_transformer_model.pth"))
model.to(device)
model.eval()
scaler = joblib.load("scaler.pkl")

def inverse_transform_prediction(preds, scaler, original_cols, target_cols):
    """
    将模型输出的归一化值反转为真实价格
    :param preds: 模型输出(shape: [n_samples, output_len, n_assets])
    :param scaler: 训练好的归一化器
    :param original_cols: 原始数据的列名列表
    :param target_cols: 目标列名(比如["bitcoin_price", "ethereum_price"])
    :return: 反归一化后的真实价格(shape: [n_samples, output_len, n_assets])
    """
    # 展平 preds 为二维(n_samples×output_len, n_assets)
    preds_flat = preds.reshape(-1, len(target_cols))
    
    # 创建dummy矩阵(填充目标列,其他列用0)
    dummy = np.zeros((preds_flat.shape[0], len(original_cols)))
    for i, col in enumerate(target_cols):
        dummy[:, original_cols.index(col)] = preds_flat[:, i]
    
    # 反归一化
    inv_dummy = scaler.inverse_transform(dummy)
    
    # 提取目标列并恢复形状
    inv_preds = inv_dummy[:, [original_cols.index(col) for col in target_cols]]
    return inv_preds.reshape(preds.shape)

# 预测测试集
with torch.no_grad():
    X_test = X_test.to(device)
    y_test = y_test.to(device)
    y_pred = model(X_test)

# 反归一化
original_cols = processed_df.columns.tolist()
target_cols = [f"{coin}_price" for coin in coin_ids]
y_test_inv = inverse_transform_prediction(y_test.cpu().numpy(), scaler, original_cols, target_cols)
y_pred_inv = inverse_transform_prediction(y_pred.cpu().numpy(), scaler, original_cols, target_cols)

# 可视化比特币的预测结果
plt.figure(figsize=(12, 6))
plt.plot(y_test_inv[:, 0, 0], label="真实价格")  # 测试集真实价格(比特币)
plt.plot(y_pred_inv[:, 0, 0], label="预测价格")  # 模型预测价格(比特币)
plt.title("比特币价格预测(Transformer vs 真实值)")
plt.xlabel("时间步(测试集)")
plt.ylabel("价格(USD)")
plt.legend()
plt.grid(True)
plt.show()

# 计算预测误差
mae_bitcoin = mean_absolute_error(y_test_inv[:, 0, 0], y_pred_inv[:, 0, 0])
rmse_bitcoin = root_mean_squared_error(y_test_inv[:, 0, 0], y_pred_inv[:, 0, 0])
print(f"比特币预测MAE:{mae_bitcoin:.2f} USD | RMSE:{rmse_bitcoin:.2f} USD")

可视化结果说明
预测曲线应与真实曲线高度重合,尤其是趋势部分(比如2021年的牛市上涨、2022年的熊市下跌)。如果误差过大,可能需要调整模型参数(比如增加编码器层数、调大d_model)或添加更多特征(比如新闻文本)。

4.2 功能2:异常检测——识别异常交易

异常检测的核心是重建误差:用Transformer自编码器(Autoencoder)将输入数据重建,重建误差大的样本即为异常。

(1)自编码器模型定义
class TransformerAutoencoder(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, seq_len, dropout=0.1):
        super().__init__()
        self.encoder = TransformerAssetTracker(
            input_dim=input_dim,
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            dim_feedforward=dim_feedforward,
            output_dim=input_dim * seq_len,  # 输出维度=输入维度×序列长度
            seq_len=seq_len,
            dropout=dropout
        )
    
    def forward(self, x):
        batch_size = x.size(0)
        # 编码器输出:(batch_size, output_len, input_dim×seq_len)
        encoded = self.encoder(x)
        # 恢复为输入形状:(batch_size, seq_len, input_dim)
        decoded = encoded.view(batch_size, seq_len, -1)
        return decoded
(2)训练自编码器

自编码器的目标是"重建输入",所以损失函数是输入与重建的MSE

# 初始化自编码器
autoencoder = TransformerAutoencoder(
    input_dim=input_dim,
    d_model=d_model,
    nhead=nhead,
    num_encoder_layers=num_encoder_layers,
    dim_feedforward=dim_feedforward,
    seq_len=seq_len,
    dropout=0.1
)
autoencoder.to(device)

# 损失函数与优化器
criterion_ae = nn.MSELoss()
optimizer_ae = optim.AdamW(autoencoder.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler_ae = optim.lr_scheduler.CosineAnnealingLR(optimizer_ae, T_max=100)

# 训练循环(类似之前的流程,略)
# 注意:自编码器的输入输出都是X_train(重建输入)
(3)检测异常

用测试集计算重建误差,设定阈值(比如95分位数),超过阈值的样本即为异常:

# 计算重建误差
autoencoder.eval()
with torch.no_grad():
    X_test_ae = X_test.to(device)
    X_recon = autoencoder(X_test_ae)
    # 每个样本的重建误差(MSE)
    reconstruction_errors = criterion_ae(X_recon, X_test_ae).cpu().numpy()

# 设定异常阈值(95分位数:只有5%的样本超过该值)
threshold = np.percentile(reconstruction_errors, 95)
anomalies = reconstruction_errors > threshold

# 可视化异常点(比特币价格)
plt.figure(figsize=(12, 6))
plt.plot(y_test_inv[:, 0, 0], label="真实价格")
# 标记异常点(红色散点)
plt.scatter(
    np.where(anomalies)[0],
    y_test_inv[anomalies, 0, 0],
    color="red",
    label="异常点"
)
plt.title("比特币价格异常检测(Transformer自编码器)")
plt.xlabel("时间步(测试集)")
plt.ylabel("价格(USD)")
plt.legend()
plt.grid(True)
plt.show()

异常点说明
红色点对应"异常交易",比如2021年5月的比特币暴跌(马斯克宣布特斯拉停止接受比特币支付)、2022年11月的FTX破产事件。这些点的重建误差远高于阈值,说明模型捕捉到了异常。

4.3 功能3:关联分析——可视化多资产注意力

Transformer的自注意力权重能直观展示"模型关注了哪些数据"。我们可以用注意力权重分析多资产间的关联(比如比特币与以太坊的联动)。

(1)修改模型以返回注意力权重

PyTorch的TransformerEncoderLayer默认不返回注意力权重,需要修改模型:

class TransformerWithAttention(TransformerAssetTracker):
    def forward(self, x):
        batch_size = x.size(0)
        x = self.embedding(x)
        x = x + self.pos_encoder
        x = self.dropout(x)
        
        # 存储每一层的注意力权重
        attention_weights = []
        for layer in self.transformer_encoder.layers:
            # 提取自注意力权重(MultiheadAttention的forward返回(output, attn_weights))
            attn_output, attn = layer.self_attn(x, x, x)  # attn shape: (batch_size, nhead, seq_len, seq_len)
            attention_weights.append(attn)
            # 继续处理编码器层的其他部分
            x = layer.norm1(x + layer.dropout1(attn_output))
            ff_output = layer.linear2(layer.dropout(layer.relu(layer.linear1(x))))
            x = layer.norm2(x + layer.dropout2(ff_output))
        
        encoder_out = x
        encoder_out_flat = encoder_out.view(batch_size, -1)
        out = self.fc(encoder_out_flat)
        out = out.view(batch_size, output_len, -1)
        return out, attention_weights
(2)可视化注意力权重

取测试集第一个样本,可视化最后一层的注意力权重:

# 加载带注意力的模型
model_attn = TransformerWithAttention(
    input_dim=input_dim,
    d_model=d_model,
    nhead=nhead,
    num_encoder_layers=num_encoder_layers,
    dim_feedforward=dim_feedforward,
    output_dim=output_dim,
    seq_len=seq_len,
    dropout=0.1
)
model_attn.load_state_dict(torch.load("best_transformer_model.pth"))
model_attn.to(device)
model_attn.eval()

# 获取第一个样本的注意力权重
with torch.no_grad():
    sample_X = X_test[0:1].to(device)  # 取第一个样本(batch_size=1)
    _, attention_weights = model_attn(sample_X)

# 取最后一层的注意力权重(shape: [1, 4, 30, 30])
last_layer_attn = attention_weights[-1].squeeze(0)  # 去掉batch维度 → [4, 30, 30]
# 平均所有头的注意力权重 → [30, 30]
avg_attn = last_layer_attn.mean(dim=0).cpu().numpy()

# 可视化注意力热力图
plt.figure(figsize=(10, 8))
plt.imshow(avg_attn, cmap="viridis")
plt.title("Transformer最后一层注意力权重(平均多头)")
plt.xlabel("目标时间步(预测时关注的时间点)")
plt.ylabel("源时间步(输入的时间点)")
plt.colorbar(label="注意力权重(0=不关注,1=高度关注)")
plt.show()

热力图说明

  • 对角线附近的红色区域:模型关注"最近几天"的数据(比如预测第30天的价格,关注第29天的数据);
  • 非对角线的红色区域:模型关注"长周期事件"(比如预测第30天的价格,关注第10天的政策事件);
  • 多资产关联:如果比特币的注意力权重与以太坊的高度重叠,说明模型捕捉到了两者的联动。

步骤五:系统部署——从模型到API服务

为了让业务方使用我们的系统,需要将模型封装为RESTful API。我们用FastAPI实现:

5.1 定义API接口
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np

# 初始化FastAPI应用
app = FastAPI(title="智能数字资产追踪系统API", version="1.0")

# 加载模型和归一化器
model.load_state_dict(torch.load("best_transformer_model.pth"))
model.to(device)
model.eval()
scaler = joblib.load("scaler.pkl")
original_cols = processed_df.columns.tolist()
target_cols = [f"{coin}_price" for coin in coin_ids]

# 定义请求体模型(Pydantic)
class AssetDataRequest(BaseModel):
    data: list  # 输入数据(shape: [seq_len, input_dim] → [30, 10])
    coin_ids: list  # 资产ID列表(比如["bitcoin", "ethereum"])

# 定义响应体模型
class PredictionResponse(BaseModel):
    predictions: list  # 预测价格(shape: [output_len, n_assets] → [1, 2])
    coin_ids: list     # 资产ID
    timestamp: str     # 预测时间

# 预测接口
@app.post("/predict", response_model=PredictionResponse)
async def predict_asset_price(request: AssetDataRequest):
    try:
        # 验证输入数据形状
        data = np.array(request.data)
        if data.shape != (seq_len, input_dim):
            raise HTTPException(
                status_code=400,
                detail=f"输入数据形状错误,应为({seq_len}, {input_dim}),实际为{data.shape}"
            )
        
        # 归一化
        scaled_data = scaler.transform(data)
        
        # 转换为PyTorch张量
        tensor_data = torch.tensor(scaled_data, dtype=torch.float32).unsqueeze(0).to(device)
        
        # 预测
        with torch.no_grad():
            prediction = model(tensor_data)
        
        # 反归一化
        prediction_np = prediction.cpu().numpy()
        prediction_inv = inverse_transform_prediction(prediction_np, scaler, original_cols, target_cols)
        
        # 构造响应
        return PredictionResponse(
            predictions=prediction_inv.tolist(),
            coin_ids=request.coin_ids,
            timestamp=pd.Timestamp.now().isoformat()
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# 异常检测接口(类似预测接口,略)
# 关联分析接口(类似预测接口,略)
5.2 运行API服务

用Uvicorn运行FastAPI应用:

uvicorn main:app --host 0.0.0.0 --port 8000 --reload

访问http://localhost:8000/docs,即可看到自动生成的API文档(Swagger UI),可以直接测试接口:

  • 输入示例(30天×10个特征的归一化数据);
  • 点击"Execute",即可得到预测结果。

进阶探讨:让系统更强大的3个方向

1. 多模态融合:加入新闻文本数据

金融资产价格受事件驱动(比如政策、新闻、社交媒体情绪),仅用价格数据不够。我们可以用BERT处理新闻文本,提取文本特征,与价格特征拼接:

from transformers import BertTokenizer, BertModel

# 加载BERT模型和Tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_model = BertModel.from_pretrained("bert-base-uncased").to(device)

def get_text_features(texts):
    """用BERT提取文本特征"""
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
    with torch.no_grad():
        outputs = bert_model(**inputs)
    # 取[CLS] token的嵌入作为文本特征(shape: [batch_size, 768])
    return outputs.last_hidden_state[:, 0, :].cpu().numpy()

# 假设每个时间步对应一条新闻,将文本特征与价格特征拼接
# 新的输入特征维度 = 价格特征维度(10) + 文本特征维度(768) = 778
# 调整模型的input_dim为778即可

2. 增量学习:适应流式数据

金融数据是流式的(每天产生新数据),重新训练整个模型成本高。我们可以用增量学习(Incremental Learning):

  • 冻结模型的底层参数(比如编码器层);
  • 仅微调顶层的输出层;
  • 用新数据迭代训练,保持模型的泛化能力。

3. 模型压缩:部署到边缘设备

Transformer模型较大(比如我们的模型约10MB),如果要部署到手机或边缘设备,可以用模型压缩技术:

  • 剪枝(Pruning):去掉不重要的权重(比如注意力权重<0.01的连接);
  • 量化(Quantization):将32位浮点数转换为8位整数,减少模型大小;
  • 知识蒸馏(Knowledge Distillation):用大模型(Teacher)训练小模型(Student),保持性能。

总结:从论文到落地的关键一步

通过本文的实战,我们完成了智能数字资产追踪系统的全流程:

  1. 数据准备:从CoinGecko获取数据,处理缺失值、添加技术指标、归一化;
  2. 模型设计:用Transformer适配金融时间序列,加入可学习的位置编码;
  3. 训练优化:用AdamW、余弦退火、早停机制解决过拟合;
  4. 功能实现:趋势预测、异常检测、关联分析;
  5. 系统部署:用FastAPI封装为API服务。

你现在可以做什么?

  • 替换数据集:比如用股票数据(从Yahoo Finance获取)、NFT地板价数据(从OpenSea获取);
  • 调整模型参数:比如增加编码器层数、调大d_model,提升预测 accuracy;
  • 扩展功能:比如加入风险评估(VaR/CVaR)、资产组合优化。

行动号召:一起探索Transformer的边界

如果你在实践中遇到问题,欢迎在评论区留言讨论!
如果你想获取完整代码和数据集,可以关注我的公众号AI架构师实战,回复"资产追踪"获取。
如果你对Transformer的其他应用场景(比如NLP、计算机视觉)感兴趣,也可以加入我的技术交流群,一起探索AI的边界!

最后一句话
AI的价值,在于将论文中的"可能性"转化为业务中的"实用性"。希望本文能成为你从"调包侠"到"AI架构师"的关键一步!

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐