实战案例:AI架构师用Transformer实现智能数字资产追踪系统
数字资产追踪的核心任务趋势预测:给定过去30天的价格/交易量数据,预测未来1天的资产价格;异常检测:识别异常交易(比如大额抛售、洗币),用"重建误差"标记异常点;关联分析:分析多资产间的关联(比如比特币与以太坊的价格联动),用注意力权重可视化。self,input_dim: int, # 输入特征维度(比如10)d_model: int, # Transformer隐藏维度(比如64)nhead:
《Transformer实战:AI架构师手把手教你搭建智能数字资产追踪系统》
引言:金融领域的"数据追踪困境"与Transformer的破局
你是否遇到过这样的场景?
作为金融分析师,你盯着屏幕上跳动的加密货币价格曲线,想找出比特币和以太坊的联动规律,却被24小时无休的交易数据淹没;作为量化交易者,你尝试用LSTM预测NFT地板价,却发现模型总是漏掉"某明星买入"这类长周期事件的影响;作为资产管理者,你想快速识别异常交易(比如大额洗币),却被传统时间序列模型的"短视"限制——它只能看到最近7天的数据。
数字资产追踪的核心痛点,在于长距离时间依赖(比如3个月前的政策事件对今日价格的影响)和多资产交叉关联(比如比特币暴跌引发的 altcoin 瀑布效应)。而传统的时间序列模型(ARIMA、LSTM)要么无法捕捉长周期依赖,要么难以并行处理多资产的复杂关系。
Transformer的出现,为解决这些问题提供了钥匙:
- 自注意力机制(Self-Attention)能同时关注"3个月前的政策"和"昨天的交易数据",捕捉长距离依赖;
- 多头注意力(Multi-Head Attention)能并行分析多资产间的关联,比如比特币与以太坊的价格联动、NFT与底层区块链的关系;
- 灵活的编码器-解码器结构,能同时支持趋势预测、异常检测、关联分析等多任务。
本文将带你做什么?
我们会从零开始,用Transformer搭建一个端到端的智能数字资产追踪系统,覆盖数据获取→预处理→模型设计→训练优化→功能落地→系统部署全流程。
读完本文你能获得什么?
- 掌握Transformer在金融时间序列场景的落地方法;
- 实现"趋势预测+异常检测+关联分析"三大核心功能;
- 学会将模型封装为API,快速部署到生产环境。
准备工作:你需要的技术储备与环境
1. 技术栈要求
- 基础能力:Python编程、Pandas/NumPy数据处理、PyTorch基本使用;
- 机器学习基础:了解时间序列分析(比如滑动窗口)、损失函数(MSE/MAE)、优化器(AdamW);
- Transformer基础:知道自注意力、位置编码、编码器-解码器的基本概念(不需要深入推导)。
2. 环境与工具
- Python版本:3.8+(避免兼容性问题);
- 必备库:PyTorch(1.10+)、Pandas、NumPy、Matplotlib、Scikit-learn、FastAPI、Transformers(可选,用于多模态);
- 数据来源:CoinGecko API(免费获取加密货币历史数据)、Kaggle(加密货币/NFT数据集);
- 部署工具:Uvicorn(运行FastAPI服务)、Docker(可选,容器化部署)。
3. 快速环境搭建
用conda
或pip
安装依赖:
# 安装PyTorch(根据CUDA版本调整,无GPU用cpu版)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# 安装其他库
pip install pandas numpy matplotlib scikit-learn fastapi uvicorn pycoingecko joblib transformers
核心实战:从0到1搭建智能追踪系统
步骤一:问题定义与数据准备
数字资产追踪的核心任务,我们聚焦3个高频需求:
- 趋势预测:给定过去30天的价格/交易量数据,预测未来1天的资产价格;
- 异常检测:识别异常交易(比如大额抛售、洗币),用"重建误差"标记异常点;
- 关联分析:分析多资产间的关联(比如比特币与以太坊的价格联动),用注意力权重可视化。
1.1 数据获取:用CoinGecko API拿加密货币数据
我们以**比特币(bitcoin)和以太坊(ethereum)**为例,获取2020-2023年的日度数据(Open/High/Low/Close/Volume):
import pandas as pd
from pycoingecko import CoinGeckoAPI
cg = CoinGeckoAPI()
def get_crypto_data(coin_ids, start_date, end_date):
"""获取多个加密货币的历史数据(价格+交易量)"""
data = {}
for coin_id in coin_ids:
# 调用API获取历史数据(时间戳+价格+交易量)
history = cg.get_coin_market_chart_range_by_id(
id=coin_id,
vs_currency="usd",
from_timestamp=pd.Timestamp(start_date).timestamp(),
to_timestamp=pd.Timestamp(end_date).timestamp()
)
# 转换为DataFrame
price_df = pd.DataFrame(history["prices"], columns=["timestamp", "price"])
volume_df = pd.DataFrame(history["total_volumes"], columns=["timestamp", "volume"])
# 合并价格与交易量,按时间戳索引
df = pd.merge(price_df, volume_df, on="timestamp")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df.set_index("timestamp", inplace=True)
data[coin_id] = df
# 合并多资产数据为宽表(列名:coin_id_feature)
combined_df = pd.concat(data.values(), axis=1, keys=data.keys())
combined_df.columns = ["_".join(col) for col in combined_df.columns]
return combined_df
# 示例:获取比特币和以太坊的2020-2023年数据
coin_ids = ["bitcoin", "ethereum"]
start_date = "2020-01-01"
end_date = "2023-01-01"
raw_df = get_crypto_data(coin_ids, start_date, end_date)
print(raw_df.head())
输出结果(部分):
timestamp | bitcoin_price | bitcoin_volume | ethereum_price | ethereum_volume |
---|---|---|---|---|
2020-01-01 00:00:00 | 7194.89 | 14245678901 | 130.04 | 4567890123 |
2020-01-02 00:00:00 | 7200.12 | 13890123456 | 132.56 | 4321098765 |
1.2 数据预处理:从原始数据到模型输入
金融数据的"脏"是出了名的——缺失值、异常值、量纲不一致。我们需要做3件事:
(1)缺失值处理
用"前向填充"(ffill)填补缺失值(假设今天的价格与昨天相关):
raw_df.fillna(method="ffill", inplace=True)
(2)特征工程:增加有效特征
仅用价格和交易量不够,我们需要加入技术指标(金融分析师常用的信号):
- 移动平均线(MA7/MA30):平滑价格曲线,反映趋势;
- 相对强弱指数(RSI):衡量资产的超买/超卖状态(范围0-100,>70超买,<30超卖)。
def add_technical_indicators(df, coin_ids):
"""为每个资产添加技术指标"""
for coin_id in coin_ids:
price_col = f"{coin_id}_price"
volume_col = f"{coin_id}_volume"
# 7天/30天移动平均线
df[f"{coin_id}_ma7"] = df[price_col].rolling(window=7).mean()
df[f"{coin_id}_ma30"] = df[price_col].rolling(window=30).mean()
# 相对强弱指数(RSI)
delta = df[price_col].diff(1) # 价格变化
gain = delta.where(delta > 0, 0) # 上涨幅度
loss = -delta.where(delta < 0, 0) # 下跌幅度
avg_gain = gain.rolling(window=14).mean() # 14天平均上涨
avg_loss = loss.rolling(window=14).mean() # 14天平均下跌
rs = avg_gain / avg_loss
df[f"{coin_id}_rsi"] = 100 - (100 / (1 + rs))
return df
# 添加技术指标
processed_df = add_technical_indicators(raw_df, coin_ids)
# 去除计算指标产生的NaN行(前30天无MA30,前14天无RSI)
processed_df.dropna(inplace=True)
(3)归一化:消除量纲影响
金融数据的量纲差异大(比如比特币价格是万美元级,交易量是十亿级),需要用MinMaxScaler
将数据缩放到[0,1]区间:
from sklearn.preprocessing import MinMaxScaler
import joblib
# 初始化归一化器
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data = scaler.fit_transform(processed_df)
# 保存归一化器(后续反归一化需要)
joblib.dump(scaler, "scaler.pkl")
(4)构建时间序列:滑动窗口法
Transformer需要序列输入(比如过去30天的数据预测未来1天)。我们用滑动窗口法生成训练数据:
import numpy as np
def create_time_sequences(data, seq_len, output_len):
"""
将时间序列转换为输入-输出对
:param data: 归一化后的特征矩阵(shape: [n_samples, n_features])
:param seq_len: 输入窗口长度(比如30天)
:param output_len: 输出窗口长度(比如1天)
:return: X (shape: [n_samples, seq_len, n_features]), y (shape: [n_samples, output_len, n_assets])
"""
X, y = [], []
n_samples = len(data) - seq_len - output_len + 1
# 遍历所有可能的窗口
for i in range(n_samples):
# 输入:前seq_len个时间步的所有特征
X.append(data[i:i+seq_len])
# 输出:接下来output_len个时间步的"价格"特征(仅预测价格)
price_cols = [processed_df.columns.get_loc(f"{coin}_price") for coin in coin_ids]
y.append(data[i+seq_len:i+seq_len+output_len, price_cols])
return np.array(X), np.array(y)
# 设定窗口参数
seq_len = 30 # 输入:过去30天
output_len = 1 # 输出:未来1天
X, y = create_time_sequences(scaled_data, seq_len, output_len)
# 划分训练/验证/测试集(时间序列必须按顺序划分,不能随机!)
train_ratio = 0.7
val_ratio = 0.2
train_size = int(train_ratio * len(X))
val_size = int(val_ratio * len(X))
X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size+val_size], y[train_size:train_size+val_size]
X_test, y_test = X[train_size+val_size:], y[train_size+val_size:]
# 转换为PyTorch张量
import torch
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
print(f"训练集形状:X_train={X_train.shape}, y_train={y_train.shape}")
# 输出:训练集形状:X_train=(584, 30, 10), y_train=(584, 1, 2)
# 解释:584个样本,每个样本输入30天×10个特征(2资产×5特征:price/volume/ma7/ma30/rsi),输出1天×2资产价格
步骤二:Transformer模型设计——适配金融时间序列
Transformer的核心是自注意力机制,但直接用论文中的Transformer并不适配金融场景——我们需要做3点调整:
- 输入嵌入:将高维特征(比如10维)映射到Transformer的隐藏维度(d_model);
- 位置编码:金融数据是时间序列,需要手动加入位置信息(用可学习的位置编码,比固定编码更灵活);
- 输出头:针对"多资产预测"任务,输出层需要对应资产数量(比如2个资产,输出2维)。
2.1 模型结构定义(PyTorch)
import torch.nn as nn
class TransformerAssetTracker(nn.Module):
def __init__(
self,
input_dim: int, # 输入特征维度(比如10)
d_model: int, # Transformer隐藏维度(比如64)
nhead: int, # 多头注意力头数(必须整除d_model,比如4)
num_encoder_layers: int, # 编码器层数(比如3)
dim_feedforward: int, # 前馈神经网络隐藏维度(比如256)
output_dim: int, # 输出维度(比如2:2个资产)
seq_len: int, # 输入窗口长度(比如30)
dropout: float = 0.1 # Dropout率
):
super().__init__()
self.d_model = d_model
# 1. 输入嵌入:将输入特征映射到d_model维度
self.embedding = nn.Linear(input_dim, d_model)
# 2. 可学习的位置编码:(1, seq_len, d_model),适配所有 batch
self.pos_encoder = nn.Parameter(torch.randn(1, seq_len, d_model))
# 3. Transformer编码器层(带残差连接+层归一化)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
batch_first=True # 输入形状:(batch_size, seq_len, d_model)(必须开!)
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
# 4. 输出层:将编码器输出映射到目标维度
self.fc = nn.Linear(d_model * seq_len, output_dim) # 展平所有时间步的特征
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
前向传播
:param x: 输入张量(shape: [batch_size, seq_len, input_dim])
:return: 输出张量(shape: [batch_size, output_len, output_dim])
"""
batch_size = x.size(0)
# 输入嵌入 + 位置编码
x = self.embedding(x) # (batch_size, seq_len, d_model)
x = x + self.pos_encoder # 加入位置信息
x = self.dropout(x)
# Transformer编码器
encoder_out = self.transformer_encoder(x) # (batch_size, seq_len, d_model)
# 展平编码器输出(seq_len × d_model → 1维)
encoder_out_flat = encoder_out.view(batch_size, -1) # (batch_size, seq_len × d_model)
# 输出层:映射到目标维度
out = self.fc(encoder_out_flat) # (batch_size, output_dim)
# 调整形状为(batch_size, output_len, output_dim)(适配y的形状)
out = out.view(batch_size, output_len, -1)
return out
2.2 模型初始化
根据我们的数据参数,初始化模型:
# 模型参数
input_dim = X_train.shape[2] # 10(2资产×5特征)
d_model = 64 # 隐藏维度(可调整,比如32/128)
nhead = 4 # 多头数(64÷4=16,必须整除)
num_encoder_layers = 3 # 编码器层数(可调整,比如2/4)
dim_feedforward = 256 # 前馈隐藏维度(可调整,比如128/512)
output_dim = len(coin_ids) # 2(比特币+以太坊)
# 创建模型
model = TransformerAssetTracker(
input_dim=input_dim,
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
dim_feedforward=dim_feedforward,
output_dim=output_dim,
seq_len=seq_len,
dropout=0.1
)
# 移动模型到GPU(如果有)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(model)
步骤三:模型训练与优化——解决过拟合与梯度爆炸
金融数据的噪声大、非平稳性(比如牛市/熊市的分布差异),训练时容易过拟合。我们需要用以下技巧:
3.1 损失函数与优化器
- 损失函数:回归任务用MSE(均方误差)(惩罚大误差);
- 优化器:用AdamW(带权重衰减的Adam,缓解过拟合);
- 学习率调度:用余弦退火(逐渐降低学习率,避免后期震荡)。
import torch.optim as optim
# 损失函数
criterion = nn.MSELoss()
# 优化器(权重衰减=1e-5,缓解过拟合)
optimizer = optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)
# 学习率调度(余弦退火,T_max=100:100个epoch后学习率降到最小值)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
3.2 数据加载器(避免内存爆炸)
用PyTorch的DataLoader
批量加载数据,避免一次性加载全部数据:
from torch.utils.data import DataLoader, TensorDataset
# 训练集DataLoader(时间序列不能shuffle!)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)
# 验证集DataLoader
val_dataset = TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
3.3 训练循环(带早停机制)
早停(Early Stopping):当验证集损失连续多轮不下降时,停止训练,避免过拟合。
from tqdm import tqdm
# 训练参数
epochs = 200
patience = 20 # 连续20轮验证损失不下降则停止
best_val_loss = float("inf")
patience_counter = 0
# 训练循环
for epoch in range(epochs):
# 1. 训练模式
model.train()
train_loss = 0.0
for batch_X, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):
# 移动数据到GPU
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
outputs = model(batch_X)
# 计算损失
loss = criterion(outputs, batch_y)
# 反向传播+梯度裁剪(防止梯度爆炸)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) # 梯度裁剪到1以内
# 更新参数
optimizer.step()
# 累加训练损失
train_loss += loss.item() * batch_X.size(0)
# 计算平均训练损失
train_loss /= len(train_loader.dataset)
# 2. 验证模式(关闭Dropout)
model.eval()
val_loss = 0.0
with torch.no_grad(): # 不计算梯度
for batch_X, batch_y in val_loader:
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
val_loss += loss.item() * batch_X.size(0)
# 计算平均验证损失
val_loss /= len(val_loader.dataset)
# 3. 学习率调度
scheduler.step()
# 打印日志
print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
# 4. 早停检查
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# 保存最佳模型
torch.save(model.state_dict(), "best_transformer_model.pth")
print("→ 保存最佳模型!")
else:
patience_counter += 1
if patience_counter >= patience:
print("→ 早停触发!")
break
步骤四:核心功能实现——从模型到业务价值
训练好模型后,我们需要将其转化为可落地的功能:趋势预测、异常检测、关联分析。
4.1 功能1:趋势预测——预测未来1天价格
预测的关键是反归一化(将模型输出的[0,1]值转换为真实价格)。
import joblib
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, root_mean_squared_error
# 加载最佳模型和归一化器
model.load_state_dict(torch.load("best_transformer_model.pth"))
model.to(device)
model.eval()
scaler = joblib.load("scaler.pkl")
def inverse_transform_prediction(preds, scaler, original_cols, target_cols):
"""
将模型输出的归一化值反转为真实价格
:param preds: 模型输出(shape: [n_samples, output_len, n_assets])
:param scaler: 训练好的归一化器
:param original_cols: 原始数据的列名列表
:param target_cols: 目标列名(比如["bitcoin_price", "ethereum_price"])
:return: 反归一化后的真实价格(shape: [n_samples, output_len, n_assets])
"""
# 展平 preds 为二维(n_samples×output_len, n_assets)
preds_flat = preds.reshape(-1, len(target_cols))
# 创建dummy矩阵(填充目标列,其他列用0)
dummy = np.zeros((preds_flat.shape[0], len(original_cols)))
for i, col in enumerate(target_cols):
dummy[:, original_cols.index(col)] = preds_flat[:, i]
# 反归一化
inv_dummy = scaler.inverse_transform(dummy)
# 提取目标列并恢复形状
inv_preds = inv_dummy[:, [original_cols.index(col) for col in target_cols]]
return inv_preds.reshape(preds.shape)
# 预测测试集
with torch.no_grad():
X_test = X_test.to(device)
y_test = y_test.to(device)
y_pred = model(X_test)
# 反归一化
original_cols = processed_df.columns.tolist()
target_cols = [f"{coin}_price" for coin in coin_ids]
y_test_inv = inverse_transform_prediction(y_test.cpu().numpy(), scaler, original_cols, target_cols)
y_pred_inv = inverse_transform_prediction(y_pred.cpu().numpy(), scaler, original_cols, target_cols)
# 可视化比特币的预测结果
plt.figure(figsize=(12, 6))
plt.plot(y_test_inv[:, 0, 0], label="真实价格") # 测试集真实价格(比特币)
plt.plot(y_pred_inv[:, 0, 0], label="预测价格") # 模型预测价格(比特币)
plt.title("比特币价格预测(Transformer vs 真实值)")
plt.xlabel("时间步(测试集)")
plt.ylabel("价格(USD)")
plt.legend()
plt.grid(True)
plt.show()
# 计算预测误差
mae_bitcoin = mean_absolute_error(y_test_inv[:, 0, 0], y_pred_inv[:, 0, 0])
rmse_bitcoin = root_mean_squared_error(y_test_inv[:, 0, 0], y_pred_inv[:, 0, 0])
print(f"比特币预测MAE:{mae_bitcoin:.2f} USD | RMSE:{rmse_bitcoin:.2f} USD")
可视化结果说明:
预测曲线应与真实曲线高度重合,尤其是趋势部分(比如2021年的牛市上涨、2022年的熊市下跌)。如果误差过大,可能需要调整模型参数(比如增加编码器层数、调大d_model)或添加更多特征(比如新闻文本)。
4.2 功能2:异常检测——识别异常交易
异常检测的核心是重建误差:用Transformer自编码器(Autoencoder)将输入数据重建,重建误差大的样本即为异常。
(1)自编码器模型定义
class TransformerAutoencoder(nn.Module):
def __init__(self, input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, seq_len, dropout=0.1):
super().__init__()
self.encoder = TransformerAssetTracker(
input_dim=input_dim,
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
dim_feedforward=dim_feedforward,
output_dim=input_dim * seq_len, # 输出维度=输入维度×序列长度
seq_len=seq_len,
dropout=dropout
)
def forward(self, x):
batch_size = x.size(0)
# 编码器输出:(batch_size, output_len, input_dim×seq_len)
encoded = self.encoder(x)
# 恢复为输入形状:(batch_size, seq_len, input_dim)
decoded = encoded.view(batch_size, seq_len, -1)
return decoded
(2)训练自编码器
自编码器的目标是"重建输入",所以损失函数是输入与重建的MSE:
# 初始化自编码器
autoencoder = TransformerAutoencoder(
input_dim=input_dim,
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
dim_feedforward=dim_feedforward,
seq_len=seq_len,
dropout=0.1
)
autoencoder.to(device)
# 损失函数与优化器
criterion_ae = nn.MSELoss()
optimizer_ae = optim.AdamW(autoencoder.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler_ae = optim.lr_scheduler.CosineAnnealingLR(optimizer_ae, T_max=100)
# 训练循环(类似之前的流程,略)
# 注意:自编码器的输入输出都是X_train(重建输入)
(3)检测异常
用测试集计算重建误差,设定阈值(比如95分位数),超过阈值的样本即为异常:
# 计算重建误差
autoencoder.eval()
with torch.no_grad():
X_test_ae = X_test.to(device)
X_recon = autoencoder(X_test_ae)
# 每个样本的重建误差(MSE)
reconstruction_errors = criterion_ae(X_recon, X_test_ae).cpu().numpy()
# 设定异常阈值(95分位数:只有5%的样本超过该值)
threshold = np.percentile(reconstruction_errors, 95)
anomalies = reconstruction_errors > threshold
# 可视化异常点(比特币价格)
plt.figure(figsize=(12, 6))
plt.plot(y_test_inv[:, 0, 0], label="真实价格")
# 标记异常点(红色散点)
plt.scatter(
np.where(anomalies)[0],
y_test_inv[anomalies, 0, 0],
color="red",
label="异常点"
)
plt.title("比特币价格异常检测(Transformer自编码器)")
plt.xlabel("时间步(测试集)")
plt.ylabel("价格(USD)")
plt.legend()
plt.grid(True)
plt.show()
异常点说明:
红色点对应"异常交易",比如2021年5月的比特币暴跌(马斯克宣布特斯拉停止接受比特币支付)、2022年11月的FTX破产事件。这些点的重建误差远高于阈值,说明模型捕捉到了异常。
4.3 功能3:关联分析——可视化多资产注意力
Transformer的自注意力权重能直观展示"模型关注了哪些数据"。我们可以用注意力权重分析多资产间的关联(比如比特币与以太坊的联动)。
(1)修改模型以返回注意力权重
PyTorch的TransformerEncoderLayer
默认不返回注意力权重,需要修改模型:
class TransformerWithAttention(TransformerAssetTracker):
def forward(self, x):
batch_size = x.size(0)
x = self.embedding(x)
x = x + self.pos_encoder
x = self.dropout(x)
# 存储每一层的注意力权重
attention_weights = []
for layer in self.transformer_encoder.layers:
# 提取自注意力权重(MultiheadAttention的forward返回(output, attn_weights))
attn_output, attn = layer.self_attn(x, x, x) # attn shape: (batch_size, nhead, seq_len, seq_len)
attention_weights.append(attn)
# 继续处理编码器层的其他部分
x = layer.norm1(x + layer.dropout1(attn_output))
ff_output = layer.linear2(layer.dropout(layer.relu(layer.linear1(x))))
x = layer.norm2(x + layer.dropout2(ff_output))
encoder_out = x
encoder_out_flat = encoder_out.view(batch_size, -1)
out = self.fc(encoder_out_flat)
out = out.view(batch_size, output_len, -1)
return out, attention_weights
(2)可视化注意力权重
取测试集第一个样本,可视化最后一层的注意力权重:
# 加载带注意力的模型
model_attn = TransformerWithAttention(
input_dim=input_dim,
d_model=d_model,
nhead=nhead,
num_encoder_layers=num_encoder_layers,
dim_feedforward=dim_feedforward,
output_dim=output_dim,
seq_len=seq_len,
dropout=0.1
)
model_attn.load_state_dict(torch.load("best_transformer_model.pth"))
model_attn.to(device)
model_attn.eval()
# 获取第一个样本的注意力权重
with torch.no_grad():
sample_X = X_test[0:1].to(device) # 取第一个样本(batch_size=1)
_, attention_weights = model_attn(sample_X)
# 取最后一层的注意力权重(shape: [1, 4, 30, 30])
last_layer_attn = attention_weights[-1].squeeze(0) # 去掉batch维度 → [4, 30, 30]
# 平均所有头的注意力权重 → [30, 30]
avg_attn = last_layer_attn.mean(dim=0).cpu().numpy()
# 可视化注意力热力图
plt.figure(figsize=(10, 8))
plt.imshow(avg_attn, cmap="viridis")
plt.title("Transformer最后一层注意力权重(平均多头)")
plt.xlabel("目标时间步(预测时关注的时间点)")
plt.ylabel("源时间步(输入的时间点)")
plt.colorbar(label="注意力权重(0=不关注,1=高度关注)")
plt.show()
热力图说明:
- 对角线附近的红色区域:模型关注"最近几天"的数据(比如预测第30天的价格,关注第29天的数据);
- 非对角线的红色区域:模型关注"长周期事件"(比如预测第30天的价格,关注第10天的政策事件);
- 多资产关联:如果比特币的注意力权重与以太坊的高度重叠,说明模型捕捉到了两者的联动。
步骤五:系统部署——从模型到API服务
为了让业务方使用我们的系统,需要将模型封装为RESTful API。我们用FastAPI实现:
5.1 定义API接口
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
# 初始化FastAPI应用
app = FastAPI(title="智能数字资产追踪系统API", version="1.0")
# 加载模型和归一化器
model.load_state_dict(torch.load("best_transformer_model.pth"))
model.to(device)
model.eval()
scaler = joblib.load("scaler.pkl")
original_cols = processed_df.columns.tolist()
target_cols = [f"{coin}_price" for coin in coin_ids]
# 定义请求体模型(Pydantic)
class AssetDataRequest(BaseModel):
data: list # 输入数据(shape: [seq_len, input_dim] → [30, 10])
coin_ids: list # 资产ID列表(比如["bitcoin", "ethereum"])
# 定义响应体模型
class PredictionResponse(BaseModel):
predictions: list # 预测价格(shape: [output_len, n_assets] → [1, 2])
coin_ids: list # 资产ID
timestamp: str # 预测时间
# 预测接口
@app.post("/predict", response_model=PredictionResponse)
async def predict_asset_price(request: AssetDataRequest):
try:
# 验证输入数据形状
data = np.array(request.data)
if data.shape != (seq_len, input_dim):
raise HTTPException(
status_code=400,
detail=f"输入数据形状错误,应为({seq_len}, {input_dim}),实际为{data.shape}"
)
# 归一化
scaled_data = scaler.transform(data)
# 转换为PyTorch张量
tensor_data = torch.tensor(scaled_data, dtype=torch.float32).unsqueeze(0).to(device)
# 预测
with torch.no_grad():
prediction = model(tensor_data)
# 反归一化
prediction_np = prediction.cpu().numpy()
prediction_inv = inverse_transform_prediction(prediction_np, scaler, original_cols, target_cols)
# 构造响应
return PredictionResponse(
predictions=prediction_inv.tolist(),
coin_ids=request.coin_ids,
timestamp=pd.Timestamp.now().isoformat()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# 异常检测接口(类似预测接口,略)
# 关联分析接口(类似预测接口,略)
5.2 运行API服务
用Uvicorn运行FastAPI应用:
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
访问http://localhost:8000/docs
,即可看到自动生成的API文档(Swagger UI),可以直接测试接口:
- 输入示例(30天×10个特征的归一化数据);
- 点击"Execute",即可得到预测结果。
进阶探讨:让系统更强大的3个方向
1. 多模态融合:加入新闻文本数据
金融资产价格受事件驱动(比如政策、新闻、社交媒体情绪),仅用价格数据不够。我们可以用BERT处理新闻文本,提取文本特征,与价格特征拼接:
from transformers import BertTokenizer, BertModel
# 加载BERT模型和Tokenizer
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
bert_model = BertModel.from_pretrained("bert-base-uncased").to(device)
def get_text_features(texts):
"""用BERT提取文本特征"""
inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(device)
with torch.no_grad():
outputs = bert_model(**inputs)
# 取[CLS] token的嵌入作为文本特征(shape: [batch_size, 768])
return outputs.last_hidden_state[:, 0, :].cpu().numpy()
# 假设每个时间步对应一条新闻,将文本特征与价格特征拼接
# 新的输入特征维度 = 价格特征维度(10) + 文本特征维度(768) = 778
# 调整模型的input_dim为778即可
2. 增量学习:适应流式数据
金融数据是流式的(每天产生新数据),重新训练整个模型成本高。我们可以用增量学习(Incremental Learning):
- 冻结模型的底层参数(比如编码器层);
- 仅微调顶层的输出层;
- 用新数据迭代训练,保持模型的泛化能力。
3. 模型压缩:部署到边缘设备
Transformer模型较大(比如我们的模型约10MB),如果要部署到手机或边缘设备,可以用模型压缩技术:
- 剪枝(Pruning):去掉不重要的权重(比如注意力权重<0.01的连接);
- 量化(Quantization):将32位浮点数转换为8位整数,减少模型大小;
- 知识蒸馏(Knowledge Distillation):用大模型(Teacher)训练小模型(Student),保持性能。
总结:从论文到落地的关键一步
通过本文的实战,我们完成了智能数字资产追踪系统的全流程:
- 数据准备:从CoinGecko获取数据,处理缺失值、添加技术指标、归一化;
- 模型设计:用Transformer适配金融时间序列,加入可学习的位置编码;
- 训练优化:用AdamW、余弦退火、早停机制解决过拟合;
- 功能实现:趋势预测、异常检测、关联分析;
- 系统部署:用FastAPI封装为API服务。
你现在可以做什么?
- 替换数据集:比如用股票数据(从Yahoo Finance获取)、NFT地板价数据(从OpenSea获取);
- 调整模型参数:比如增加编码器层数、调大d_model,提升预测 accuracy;
- 扩展功能:比如加入风险评估(VaR/CVaR)、资产组合优化。
行动号召:一起探索Transformer的边界
如果你在实践中遇到问题,欢迎在评论区留言讨论!
如果你想获取完整代码和数据集,可以关注我的公众号AI架构师实战,回复"资产追踪"获取。
如果你对Transformer的其他应用场景(比如NLP、计算机视觉)感兴趣,也可以加入我的技术交流群,一起探索AI的边界!
最后一句话:
AI的价值,在于将论文中的"可能性"转化为业务中的"实用性"。希望本文能成为你从"调包侠"到"AI架构师"的关键一步!
更多推荐
所有评论(0)