LSTM 预测 AirPassengers 数据集
我们要是把一整条时间线的记忆全部塞进他的脑子多半也是记不住的,久远的细节早就模糊了,而且我做的这个问题其实也并不需要很久远的历史数据进行参考;这里一共有144组数据,这里我采用前120组作为训练集,后24组作为测试集,在做时序预测时我们应该严格的从物理意义上分割训练集和测试集,我们不能让模型看到"未来"的数据,这样训练出来的模型在测试性能的时候会出去过于乐观的情况,因此注意数据是否泄露是数序预测中
作者:小泽
https://github.com/xiaoze-xiaoze/Deep-Learning-for-Beginners
这篇文章来带大家写一下 LSTM 的代码,来预测 AirPassengers 数据集。其实在这之前我已经写过一版ETTh1的代码了,但是那个数据是一个电力变压器与负荷的数据集,但是因为数据集较为庞大而且油温更多是受负载的影响而非自身的历史数据,所以做单变量预测的效果并不好,所以我换了另一个轻量级的数据集 AirPassengers,这个数据集录了 1949 年至 1960 年每个月的国际航空乘客数量(单位为千人),具有明显的季节性波动和趋势变化特征,我认为更适合入门,ETTh1我们可以放到后面再去做。
废话不多说我们直接看代码怎么写:
导入模块并定义device:
import pandas as pd
import requests
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
下载数据集:
def data():
url = "https://raw.githubusercontent.com/Techtonique/datasets/main/time_series/univariate/AirPassengers.csv"
save_path = r"D:\Deep Learning Notes\01 Deep Learning for Beginners\00 Data\AirPassengers\AirPassengers.csv"
r = requests.get(url, allow_redirects=True)
open(save_path, 'wb').write(r.content)
可以修改 r"D:\Deep Learning Notes\01 Deep Learning for Beginners\00 Data\AirPassengers\AirPassengers.csv"
设置自己的保存路径.
读取数据并归一化:
df = pd.read_csv(r"D:\Deep Learning Notes\01 Deep Learning for Beginners\00 Data\AirPassengers\AirPassengers.csv")
passengers = df.iloc[:, 1].values.reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(passengers)
这里其实 AirPassengers 这个数据集并不需要通过归一化来统一量纲,但是这里要把数据喂给对尺度敏感的模型,归一化一下可以防止梯度爆炸,让训练收敛,让误差可比,这里我也没有测不做归一化的效果如何,大家有兴趣可以做一下试试。
然后划分训练集和测试集:
split = 120
train_data = scaled[:split]
test_data = scaled[split:]
这里一共有144组数据,这里我采用前120组作为训练集,后24组作为测试集,在做时序预测时我们应该严格的从物理意义上分割训练集和测试集,我们不能让模型看到"未来"的数据,这样训练出来的模型在测试性能的时候会出去过于乐观的情况,因此注意数据是否泄露是数序预测中非常重要的一步,这一步甚至比你的模型搭建还要重,我在这上面踩过非常非常多的坑。
马上进入处理数据最关键的一步了,设置滑动窗口机制:
def make_dataset(data, seq_length):
x, y = [], []
for i in range(len(data) - seq_length):
x.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(x), np.array(y)
x_train, y_train = make_dataset(train_data, 12)
x_test, y_test = make_dataset(test_data, 12)
仔细思考一下我们为什么要设置滑动窗口?我们也知道即便LSTM在RNN的基础上做出了很复杂的改进但是真实性能……相对来说也不太聪明。我们要是把一整条时间线的记忆全部塞进他的脑子多半也是记不住的,久远的细节早就模糊了,而且我做的这个问题其实也并不需要很久远的历史数据进行参考;这样还会出现早期记忆和最新经验"相互打架"的问题,简单来说就是童年的规则早已不适合成年世界了会影响模型的判断,通过缩短上下文,减少远古噪声的干扰;同时过长的输入一定会导致梯度爆炸的问题。其实滑动窗口就有点像"注意力"机制。
我们把处理好的数据转化成张量:
x_train = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
x_test = torch.tensor(x_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).to(device)
定义模型:
class LSTM(nn.Module):
def __init__(self, num_layers=1):
super(LSTM, self).__init__()
self.num_layers = num_layers
self.lstm = nn.LSTM(1, 64, num_layers, batch_first=True)
self.fc = nn.Linear(64, 1)
def forward(self, x):
out, _ = self.lstm(x)
last = out[:, -1, :]
return self.fc(last)
model = LSTM().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
写训练函数:
def train():
losses = []
for epoch in range(1, 1201):
model.train()
pred = model(x_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses.append(loss.item())
if epoch % 100 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}')
plt.figure(figsize=(10, 6))
plt.title('Loss')
plt.plot(losses)
plt.show()
写测试函数:
def test():
model.eval()
with torch.no_grad():
train_pred = model(x_train)
test_pred = model(x_test)
train_pred = scaler.inverse_transform(train_pred.cpu().numpy())
y_train_inv = scaler.inverse_transform(y_train.cpu().numpy())
test_pred = scaler.inverse_transform(test_pred.cpu().numpy())
y_test_inv = scaler.inverse_transform(y_test.cpu().numpy())
full_actual = np.concatenate([y_train_inv, y_test_inv])
full_pred = np.concatenate([train_pred, test_pred])
plt.figure(figsize=(10, 6))
plt.title('Air Passengers')
plt.plot(full_actual, label='Actual')
plt.plot(full_pred, label='Predicted')
plt.legend()
plt.show()
print(f'RMSE: {np.sqrt(mean_squared_error(y_test_inv, test_pred))}')
print(f'MAE: {mean_absolute_error(y_test_inv, test_pred)}')
最后是主函数:
if __name__ == '__main__':
data()
train()
test()
最后附上预测结果和完整代码:
import pandas as pd
import requests
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def data():
url = "https://raw.githubusercontent.com/Techtonique/datasets/main/time_series/univariate/AirPassengers.csv"
save_path = r"D:\Deep Learning Notes\01 Deep Learning for Beginners\00 Data\AirPassengers\AirPassengers.csv"
r = requests.get(url, allow_redirects=True)
open(save_path, 'wb').write(r.content)
df = pd.read_csv(r"D:\Deep Learning Notes\01 Deep Learning for Beginners\00 Data\AirPassengers\AirPassengers.csv")
passengers = df.iloc[:, 1].values.reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(passengers)
split = 120
train_data = scaled[:split]
test_data = scaled[split:]
def make_dataset(data, seq_length):
x, y = [], []
for i in range(len(data) - seq_length):
x.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(x), np.array(y)
x_train, y_train = make_dataset(train_data, 12)
x_test, y_test = make_dataset(test_data, 12)
x_train = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
x_test = torch.tensor(x_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).to(device)
class LSTM(nn.Module):
def __init__(self, num_layers=1):
super(LSTM, self).__init__()
self.num_layers = num_layers
self.lstm = nn.LSTM(1, 64, num_layers, batch_first=True)
self.fc = nn.Linear(64, 1)
def forward(self, x):
out, _ = self.lstm(x)
last = out[:, -1, :]
return self.fc(last)
model = LSTM().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
def train():
losses = []
for epoch in range(1, 1201):
model.train()
pred = model(x_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses.append(loss.item())
if epoch % 100 == 0:
print(f'Epoch {epoch}, Loss: {loss.item()}')
plt.figure(figsize=(10, 6))
plt.title('Loss')
plt.plot(losses)
plt.show()
def test():
model.eval()
with torch.no_grad():
train_pred = model(x_train)
test_pred = model(x_test)
train_pred = scaler.inverse_transform(train_pred.cpu().numpy())
y_train_inv = scaler.inverse_transform(y_train.cpu().numpy())
test_pred = scaler.inverse_transform(test_pred.cpu().numpy())
y_test_inv = scaler.inverse_transform(y_test.cpu().numpy())
full_actual = np.concatenate([y_train_inv, y_test_inv])
full_pred = np.concatenate([train_pred, test_pred])
plt.figure(figsize=(10, 6))
plt.title('Air Passengers')
plt.plot(full_actual, label='Actual')
plt.plot(full_pred, label='Predicted')
plt.legend()
plt.show()
print(f'RMSE: {np.sqrt(mean_squared_error(y_test_inv, test_pred))}')
print(f'MAE: {mean_absolute_error(y_test_inv, test_pred)}')
if __name__ == '__main__':
data()
train()
test()
时序预测其实是一个非常复杂的问题,广泛应于与金融/电商等很多领域,它需要考虑到数据的趋势、季节性和噪声等因素。在面对长程时序预测;数据量有限;高噪声等等很多问题仍然是巨大的挑战,后面有机会我可以做一期时序预测的专题,从传统统计学方法做时序预测(ARIMA)到机器学习(XGBoost)以及到现在的深度学习(LSTM/Transformer),也可以个大家讲一讲我是如何从及其喜欢使用深度学习甚至觉得传统统计学模型已经老了,到现在发现其仍然是高效且不可或缺的一环的。下一篇我会讲GNN和GCN的内容,大家敬请期待!
更多推荐
所有评论(0)