深度学习入门(3)新冠人数预测
(epoch, epochs, time.time()-start_time, plt_train_loss[-1], plt_val_loss[-1]))#打印当前轮次、耗时、训练损失和验证损失。val_x, val_y = val_x.to(device), val_y.to(device)#将验证数据转移到设备上。return self.X[item].float(), self.Y[ite
·
数据下载地址:ML2021Spring-hw1 | Kaggle
import torch import matplotlib.pyplot as plt #画图 import matplotlib import numpy as np import csv #导入Python内置库,用于读写CSV文件 import pandas #pandas是一个第三方数据分析库,其集成了大量的数据分析工具,可以方便的处理和分析各类数据 from torch.utils.data import Dataset, DataLoader #导入数据加载的核心组件,后续可通过 Dataset自定义数据集,用 DataLoader高效批量读取 #torch.utils.data是PyTorch 中处理数据的子模块 import torch.nn as nn #torch.nn是PyTorch 的神经网络模块,包含层、损失函数等 from torch import optim #从torch根模块导入optim子模块,optim包含优化算法(如 SGD、Adam) import time #直接导入Python标准库time,用于记录训练时间
#定义数据集
class CovidDataset(Dataset): #继承PyTorch的torch.utils.data.Dataset抽象类
def __init__(self, file_path, mode): #数据初始化
with open(file_path, "r") as f: #"r"以只读模式。安全打开文件(使用上下文管理器(with语句)打开文件,确保文件在操作完成后自动关闭),返回文件对象f,用于后续读取操作
ori_data = list(csv.reader(f)) #将文件逐行解析为CSV格式的迭代器,并将 CSV迭代器转换为列表,方便后续索引和切片操作
csv_data = np.array(ori_data)[1:, 1:].astype(float) #去掉第一行和第一列
#逢五取1,不推荐
if mode == "train":
indices = [i for i in range(len(csv_data)) if i % 5 != 0]
#训练集取 4/5 数据
elif mode == "val":
indices = [i for i in range(len(csv_data)) if i % 5 == 0]
#验证集取 1/5 数据
elif mode == "test":
indices = [i for i in range(len(csv_data))]
#测试集取全部数据
X = torch.tensor(csv_data[indices, :93]) #选取指定行和前 93 列,将 NumPy 数组转换为 PyTorch 张量,提取原始特征数据
if mode != "test":
self.Y = torch.tensor(csv_data[indices, -1]) #测试集标签为最后一列
self.X = (X - X.mean(dim=0, keepdim=True)) / X.std(dim=0, keepdim=True)
#将特征数据标准化
self.mode = mode
def __getitem__(self, item):
if self.mode == "test":
return self.X[item].float() #测试集只返回特征
else:
return self.X[item].float(), self.Y[item].float() #训练/验证集返回特征和标签
def __len__(self):
return len(self.X) #返回样本数量,供DataLoader确定迭代次数
#定义模型
class myModel(nn.Module):
def __init__(self, inDim):
super(myModel, self).__init__() #调用父类nn.Module的初始化方法,确保正确初始化模型
self.fc1 = nn.Linear(inDim, 128) #定义全连接层(线性层),输入维度为inDim,输出维度为128
self.relu1 = nn.ReLU() #ReLU激活函数,引入非线性:f(x) = max(0, x)
self.fc2 = nn.Linear(128, 1) #第二个全连接层,将128维特征映射到1维输出
def forward(self, x):
x = self.fc1(x) #输入通过第一个全连接层,输出形状变为(batch_size, 128)
x = self.relu1(x) #对每个元素应用ReLU激活函数,输出形状不变(batch_size,128)
x = self.fc2(x) #通过第二个全连接层,输出形状变为(batch_size, 1)
if len(x.size()) > 1:
x = x.squeeze(1) #统一预测值与标签的维度,如果预测值维度大于1,就去掉第二个维度,压缩为1维
return x
#训练与验证
def train_val(model, train_loader, val_loader, lr, optimizer, device, epochs, save_path):
model = model.to(device) #将模型(包括其参数和缓冲区)转移到device指定的设备
plt_train_loss = [] #记录每轮训练的平均损失
plt_val_loss = [] #记录每轮验证的平均损失
min_val_loss = 999999999999999999.9 #初始化最小验证损失(用于保存最佳模型)
for epoch in range(epochs): #开始训练
model.train() #设置为训练模式(启用 Dropout 和 BatchNorm)
start_time = time.time() #记录开始时间
train_loss = 0.0 #初始化当前轮的训练损失
for x, y in train_loader: #按批次遍历训练数据集
x, y = x.to(device), y.to(device) #将训练数据转移到设备上
y_pred = model(x) #前向传播
bat_loss = loss(y_pred, y, model) #计算损失
bat_loss.backward() #反向传播
optimizer.step() #更新参数
optimizer.zero_grad() #清空梯度
train_loss += bat_loss.cpu().item() #累加本轮各个批次的损失,将张量移动到CPU,并从单元素张量中提取Python的float或int值
plt_train_loss.append(train_loss/train_loader.__len__()) #用本轮总损失 ÷ 批次数量计算平均损失
#append()是列表的内置方法,用于在列表末尾添加一个元素
#验证
model.eval() #设置为评估模式(关闭 Dropout 和 BatchNorm)
val_loss = 0.0
with torch.no_grad(): #禁用梯度,节省内存,验证/测试无需计算梯度更新参数
for val_x, val_y in val_loader: #遍历验证数据
val_x, val_y = val_x.to(device), val_y.to(device) #将验证数据转移到设备上
val_pred_y = model(val_x) #前向传播
val_bat_loss = loss(val_pred_y, val_y, model) #计算损失
val_loss += val_bat_loss.cpu().item() #累加损失
plt_val_loss.append(val_loss / val_loader.__len__()) #计算平均损失
#保存最佳模型
if val_loss < min_val_loss:
min_val_loss = val_loss #若当前验证损失更小,更新最小损失
torch.save(model, save_path) #保存模型
#torch.save():保存一个序列化(serialized)的目标到磁盘。函数使用了Python的pickle程序用于序列化。模型(models),张量(tensors)和文件夹(dictionaries)都是可以用这个函数保存的目标类型
print("[%03d/%03d] %2.2f sec(s) train_loss: %.6f val_loss:%.6f" % \
(epoch, epochs, time.time()-start_time, plt_train_loss[-1], plt_val_loss[-1])) #打印当前轮次、耗时、训练损失和验证损失
plt.plot(plt_train_loss) #绘制训练损失曲线
plt.plot(plt_val_loss) #绘制验证损失曲线
plt.title("loss") #标题
plt.legend(["train", "val"]) #图例,按绘制顺序为图表中的曲线或数据系列添加标签
plt.show() #显示图像
#定义评估层
def evaluate(model_path, test_loader, rel_path, device):
model = torch.load(model_path).to(device) #加载训练好的模型,并将其移动到指定的device
#torch.load() :用来加载模型。torch.load() 使用Python的解压工具(unpickling)来反序列化 pickled object(通过pickle模块序列化后的Python对象)到对应存储设备上
rel = [] #记录预测结果
model.eval() #将模型设置为评估模式
with torch.no_grad(): #禁用梯度
for x in test_loader: #遍历测试数据集
x = x.to(device) #将数据移动到指定设备
pred = model(x) #前向传播,得到预测结果
rel.append(pred.cpu().item()) #将预测结果移回CPU并转换为Python标量,存入列表
with open(rel_path, "w", newline="") as f: #"w"以写入模式打开 rel_path指定CSV文件,newline=""避免空行
csv_writer = csv.writer(f) #初始化写入器
csv_writer.writerow(["id", "tested_positive"]) #写入表头:设定好列名
for i, pred in enumerate(rel): #遍历预测结果,同时获取索引i和预测值pred
csv_writer.writerow([str(i), str(pred)]) #写入每行数据
print("结果保存到了"+rel_path)
#定义损失函数
def mseLoss(pred, target, model):
loss = nn.MSELoss(reduction='mean') #均方误差损失。
#reduction='mean'表示对批量样本的损失取平均值;'sum'为计算所有样本损失的总和;'none'为返回每个样本的损失(形状为 (batch_size,)的张量。
regularization_loss = 0 # 设正则化项初值为0
for param in model.parameters(): #遍历模型中所有需要计算梯度的参数(可训练参数)
regularization_loss += torch.sum(param ** 2) #计算所有参数平,添加L2正则化(权重衰减)防止过拟合
return loss(pred, target) + 0.00075 * regularization_loss #总损失 = 均方误差 + 正则化项,正则化系数设为0.00075
#主程序 train_file = r"E:\zhuqian\ml2021spring-hw1\covid.train.csv" #训练集路径 test_file = r"E:\zhuqian\ml2021spring-hw1\covid.test.csv" #测试集路径 batch_size = 16 #步长设为16 train_set = CovidDataset(train_file, "train") #训练集 val_set = CovidDataset(train_file, "val") #验证集 test_set = CovidDataset(test_file, "test") #测试集 train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_set, batch_size=1, shuffle=False) #测试集不能打乱顺序,否则提交的结果是乱序的 #DataLoader()将数据集包装为可迭代的数据加载器 #shuffle=True:每个epoch打乱数据顺序,防止模型记忆样本顺序
loss = mseLoss #将loss变量指向了mseLoss函数,后续调用 loss()等价于调用mseLoss() epochs = 20 #训练轮次 lr = 0.001 #学习率 device = "cuda" if torch.cuda.is_available() else "cpu" #自动检测当前环境是否支持 cuda(即是否有可用的 NVIDIA GPU),没有则使用 CPU进行计算 print(device)
data_dim = 93 #输入特征维度 model = myModel(data_dim).to(device) #实例化模型并移至指定设备 save_path = "model_save/best_model.pth" #模型保存路径 rel_path = "pred.csv" #测试集预测结果保存路径 optimizer = optim.SGD(params=model.parameters(), lr=lr, momentum=0.9) #优化器,动量为0.9 #动量可以加速收敛并减少震荡,通过引入“惯性”来平滑参数更新
#训练模型 train_val(model, train_loader, val_loader, lr, optimizer, device, epochs, save_path) #测试并提交结果 evaluate(save_path, test_loader, rel_path, device)
运行结果:


更多推荐



所有评论(0)