深入PyTorch模型的训练与可视化
一、PyThorch训练基础与数据可视化
1. 模型训练与可视化
1.1模型训练的流程模板
训练模型,需要有以下内容
- 数据集:指定哪些是训练集、测试集
- 数据加载器:指定如何加载
- 数据模型:定义模型结构
- 损失函数:定义如何计算相差多少
- 优化器:定义如何更新网络参数
- 数据保存:保存训练好的参数,以及训练当中的信息
1.2.跑通一个PyTorch官方demo
跑通一个PyTorch官方demo
# 导入必要的 PyTorch 模块 import torch from torch import nn from torch.utils.data import DataLoader from torchvision import datasets from torchvision.transforms import ToTensor # ====================== # 1. 加载数据集 # ====================== # 下载并加载 FashionMNIST 训练数据集 # root: 数据存储路径 # train=True 表示加载训练集 # download=True 如果本地没有数据则自动下载 # transform=ToTensor() 将 PIL 图像或 NumPy 数组转换为 [0,1] 范围的 FloatTensor,并将 HWC 格式转为 CHW training_data = datasets.FashionMNIST( root="data", train=True, download=True, transform=ToTensor(), ) # 加载测试数据集(train=False) test_data = datasets.FashionMNIST( root="data", train=False, download=True, transform=ToTensor(), ) # ====================== # 2. 创建数据加载器 # ====================== batch_size = 64 # 每批数据的样本数量 # 使用 DataLoader 封装数据集,支持批量加载、打乱、多线程等 train_dataloader = DataLoader(training_data, batch_size=batch_size) test_dataloader = DataLoader(test_data, batch_size=batch_size) # 打印一个批次的数据形状,用于验证数据加载是否正确 for X, y in test_dataloader: print(f"Shape of X [N, C, H, W]: {X.shape}") # N: batch size, C: channels (1), H/W: 28x28 print(f"Shape of y: {y.shape} {y.dtype}") # y 是类别标签(0~9),类型为 long break # ====================== # 3. 设置设备(GPU 或 CPU) # ====================== # 自动选择可用设备:优先使用 CUDA GPU,否则用 CPU device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using {device} device") # ====================== # 4. 定义神经网络模型 # ====================== class NeuralNetwork(nn.Module): def __init__(self): super().__init__() # 将 28x28 的图像展平为 784 维向量 self.flatten = nn.Flatten() # 定义一个包含三层全连接层(带 ReLU 激活)的序列模块 self.linear_relu_stack = nn.Sequential( nn.Linear(28 * 28, 512), # 输入 784 -> 输出 512 nn.ReLU(), # 非线性激活 nn.Linear(512, 512), # 隐藏层 nn.ReLU(), nn.Linear(512, 10) # 输出层:10 个类别(FashionMNIST 有 10 类) # 注意:这里不加 Softmax,因为 CrossEntropyLoss 内部已包含 ) def forward(self, x): # 前向传播:先展平输入,再通过全连接层堆栈 x = self.flatten(x) logits = self.linear_relu_stack(x) return logits # 返回的是 logits(未归一化的分数) # 实例化模型并移动到指定设备(GPU/CPU) model = NeuralNetwork().to(device) print(model) # 打印模型结构 # ====================== # 5. 定义损失函数和优化器 # ====================== # 使用交叉熵损失(适用于多分类任务) loss_fn = nn.CrossEntropyLoss() # 使用随机梯度下降(SGD)优化器,学习率设为 0.001 optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) # ====================== # 6. 定义训练函数 # ====================== def train(dataloader, model, loss_fn, optimizer): size = len(dataloader.dataset) # 总样本数 model.train() # 设置为训练模式(启用 Dropout、BatchNorm 等) for batch, (X, y) in enumerate(dataloader): # 将数据移动到指定设备 X, y = X.to(device), y.to(device) # 前向传播:计算预测值 pred = model(X) # 计算损失(预测值 vs 真实标签) loss = loss_fn(pred, y) # 反向传播 + 参数更新 loss.backward() # 计算梯度 optimizer.step() # 更新参数 optimizer.zero_grad() # 清零梯度(防止累积) # 每 100 个 batch 打印一次训练进度 if batch % 100 == 0: loss_val = loss.item() current = (batch + 1) * len(X) # 当前已处理的样本数 print(f"loss: {loss_val:>7f} [{current:>5d}/{size:>5d}]") # ====================== # 7. 定义测试函数 # ====================== def test(dataloader, model, loss_fn): size = len(dataloader.dataset) # 测试集总样本数 num_batches = len(dataloader) # 批次数 model.eval() # 设置为评估模式(禁用 Dropout 等) test_loss = 0 correct = 0 # 禁用梯度计算(节省内存,加速推理) with torch.no_grad(): for X, y in dataloader: X, y = X.to(device), y.to(device) pred = model(X) # 累加损失 test_loss += loss_fn(pred, y).item() # 计算正确预测的数量 # pred.argmax(1) 获取每行最大值的索引(即预测类别) correct += (pred.argmax(1) == y).type(torch.float).sum().item() # 计算平均损失和准确率 test_loss /= num_batches correct /= size # 转换为比例(0~1) print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n") # ====================== # 8. 开始训练循环 # ====================== epochs = 5 # 训练轮数 for t in range(epochs): print(f"Epoch {t + 1}\n-------------------------------") train(train_dataloader, model, loss_fn, optimizer) # 训练一轮 test(test_dataloader, model, loss_fn) # 测试一轮 print("Done!") torch.save(model.state_dict(), "model.pth") print("保存成功!") model = NeuralNetwork.to(device) model.load_state_dict(torch.load("model.pth")) classes = [ "T-shirt/top", "Trouser" "Pullover", "Dress", "Coat", "Sandal", "Shirt", "Sneaker", "Bag", "Ankle boot", ] model.eval() x, y = test_data[0][0], test_data[0][1] with torch.no_grad(): x = x.to(device) pred = model(x) predicted, actual = classes[pred[0].argmax(0), classes[y]] print(f"predicted={predicted},actual={actual}")
1.3.训练函数
训练函数的作用
完整的训练函数包含以下功能:
从数据加载器中读取数据,传入模型,获取输出
将输出和真实标签进行比较,获取误差
使用优化器,将误差进行反向传播
返回误差值
优化train后的代码
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm # pip install tqdm
# 指定训练集
training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
# 指定测试集
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
)
# 创建数据加载器
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
# 定义模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork().to(device)
print(model)
# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
# 初始化训练数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为训练模式
model.train()
# 初始化总损失和正确预测数量
loss_total = 0
correct = 0
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 计算正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算预测结果和真实结果之间的损失
loss = loss_fn(pred, y)
# 累加总损失
loss_total += loss.item()
# 执行反向传播,计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 清除梯度信息
optimizer.zero_grad()
# 计算平均损失和准确率
loss_avg = loss_total / num_batches
correct /= size
# 返回准确率和平均损失,保留三位小数
return round(correct, 3), round(loss_avg,3)
# 定义测试函数
def test(dataloader, model, loss_fn):
size = len(dataloader.dataset)
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
train_acc_list = []
train_loss_list = []
# 定义循环次数,每次循环里面,先训练,再测试
epochs = 5
for t in range(epochs):
print(f"Epoch {t + 1}\n-------------------------------")
train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
train_acc_list.append(train_acc)
train_loss_list.append((train_loss))
test(test_dataloader, model, loss_fn)
print("Done!")
# ===== 在文件顶部或绘图前添加以下两行 =====
import matplotlib
matplotlib.use('TkAgg') # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt
x_list = [i+1 for i in range(len(train_acc_list))]
plt.plot(x_list, train_acc_list, label="Train")
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()
plt.plot(x_list, train_loss_list, label="Train")
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()
1.4.测试函数
完整的测试函数包含以下功能:
从数据加载器中读取数据,传入模型,获取输出
将输出和真实标签进行比较,获取误差
返回误差值,以及准确度
优化train和test后的代码
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm # pip install tqdm
# 指定训练集
training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
# 指定测试集
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
)
# 创建数据加载器
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
# 定义模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork().to(device)
print(model)
# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
# 初始化训练数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为训练模式
model.train()
# 初始化总损失和正确预测数量
loss_total = 0
correct = 0
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 计算正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算预测结果和真实结果之间的损失
loss = loss_fn(pred, y)
# 累加总损失
loss_total += loss.item()
# 执行反向传播,计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 清除梯度信息
optimizer.zero_grad()
# 计算平均损失和准确率
loss_avg = loss_total / num_batches
correct /= size
# 返回准确率和平均损失,保留三位小数
return round(correct, 3), round(loss_avg,3)
# 定义测试函数
def test(dataloader, model, loss_fn):
# 初始化测试数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为评估模式
model.eval()
# 初始化测试损失和正确预测数量
test_loss, correct = 0, 0
# 不计算梯度,以提高计算效率并减少内存使用
with torch.no_grad():
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 累加预测损失
test_loss += loss_fn(pred, y).item()
# 累加正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算平均测试损失和准确率
test_loss /= num_batches
correct /= size
# 返回准确率和平均测试损失,保留三位小数
return round(correct, 3), round(test_loss, 3)
train_acc_list = []
train_loss_list = []
test_acc_list = []
test_loss_list = []
# 定义循环次数,每次循环里面,先训练,再测试
epochs = 5
for t in range(epochs):
print(f"Epoch {t + 1}\n-------------------------------")
train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
train_acc_list.append(train_acc)
train_loss_list.append(train_loss)
test_acc, test_loss = test(test_dataloader, model, loss_fn)
test_acc_list.append(test_acc)
test_loss_list.append(test_loss)
print("Done!")
# ===== 在文件顶部或绘图前添加以下两行 =====
import matplotlib
matplotlib.use('TkAgg') # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt
x_list = [i+1 for i in range(len(train_acc_list))]
plt.plot(x_list, train_acc_list, label="Train")
plt.plot(x_list, test_acc_list, label="Test")
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()
plt.plot(x_list, train_loss_list, label="Train")
plt.plot(x_list, test_loss_list, label="Test")
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()
1.5.训练的主函数
主函数的作用
完整的主函数包含以下功能:
定义训练和测试用到的数据集和数据加载器
定义模型结构
定义损失函数与优化器
运行训练函数与测试函数
记录运行信息,完成数据保存
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm # pip install tqdm
import matplotlib
matplotlib.use('TkAgg') # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt
import os
# 定义模型
class NeuralNetwork(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10)
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
# 初始化训练数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为训练模式
model.train()
# 初始化总损失和正确预测数量
loss_total = 0
correct = 0
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 计算正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算预测结果和真实结果之间的损失
loss = loss_fn(pred, y)
# 累加总损失
loss_total += loss.item()
# 执行反向传播,计算梯度
loss.backward()
# 更新模型参数
optimizer.step()
# 清除梯度信息
optimizer.zero_grad()
# 计算平均损失和准确率
loss_avg = loss_total / num_batches
correct /= size
# 返回准确率和平均损失,保留三位小数
return round(correct, 3), round(loss_avg,3)
# 定义测试函数
def test(dataloader, model, loss_fn):
# 初始化测试数据集的大小和批次数量
size = len(dataloader.dataset)
num_batches = len(dataloader)
# 设置模型为评估模式
model.eval()
# 初始化测试损失和正确预测数量
test_loss, correct = 0, 0
# 不计算梯度,以提高计算效率并减少内存使用
with torch.no_grad():
# 遍历数据加载器中的所有数据批次
for X, y in tqdm(dataloader):
# 将数据和标签移动到指定设备(例如GPU)
X, y = X.to(device), y.to(device)
# 使用模型进行预测
pred = model(X)
# 累加预测损失
test_loss += loss_fn(pred, y).item()
# 累加正确预测的数量
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
# 计算平均测试损失和准确率
test_loss /= num_batches
correct /= size
# 返回准确率和平均测试损失,保留三位小数
return round(correct, 3), round(test_loss, 3)
def writedata(txt_log_name, epoch, train_accuracy, train_loss, test_accuracy, test_loss):
# 保存到文档
with open(txt_log_name, "a+") as f:
f.write(f"Epoch:{epoch}\ttrain_accuracy:{train_accuracy}\ttrain_loss:{train_loss}\ttest_accuracy:{test_accuracy}\ttest_loss:{test_loss}\n")
def plot_txt(log_txt_loc):
with open(log_txt_loc, 'r') as f:
log_data = f.read()
# 解析日志数据
epochs = []
train_accuracies = []
train_losses = []
test_accuracies = []
test_losses = []
for line in log_data.strip().split('\n'):
epoch, train_acc, train_loss, test_acc, test_loss = line.split('\t')
epochs.append(int(epoch.split(':')[1]))
train_accuracies.append(float(train_acc.split(':')[1]))
train_losses.append(float(train_loss.split(':')[1]))
test_accuracies.append(float(test_acc.split(':')[1]))
test_losses.append(float(test_loss.split(':')[1]))
# 创建折线图
plt.figure(figsize=(10, 5))
# 训练数据
plt.subplot(1, 2, 1)
plt.plot(epochs, train_accuracies, label='Train Accuracy')
plt.plot(epochs, test_accuracies, label='Test Accuracy')
plt.title('Training Metrics')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()
# 设置横坐标刻度为整数
plt.xticks(range(min(epochs), max(epochs) + 1))
# 测试数据
plt.subplot(1, 2, 2)
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, test_losses, label='Test Loss')
plt.title('Testing Metrics')
plt.xlabel('Epoch')
plt.ylabel('Value')
plt.legend()
# 设置横坐标刻度为整数
plt.xticks(range(min(epochs), max(epochs) + 1))
plt.tight_layout()
plt.show()
if __name__ == '__main__':
log_root = "logs"
log_txt_loc = os.path.join(log_root,"log.txt")
if os.path.isdir(log_root):
pass
else:
os.mkdir(log_root)
# 指定训练集
training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor(),
)
# 指定测试集
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor(),
)
# 创建数据加载器
batch_size = 64
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)
for X, y in test_dataloader:
print(f"Shape of X [N, C, H, W]: {X.shape}")
print(f"Shape of y: {y.shape} {y.dtype}")
break
# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
model = NeuralNetwork().to(device)
print(model)
# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
best_acc = 0
# 定义循环次数,每次循环里面,先训练,再测试
epochs = 5
for t in range(epochs):
print(f"Epoch {t + 1}\n-------------------------------")
train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
test_acc, test_loss = test(test_dataloader, model, loss_fn)
writedata(log_txt_loc,t,train_acc,train_loss,test_acc,test_loss)
# 保存最佳模型
if test_acc > best_acc:
best_acc = test_acc
torch.save(model.state_dict(), os.path.join(log_root,"best.pth"))
torch.save(model.state_dict(), os.path.join(log_root,"last.pth"))
print("Done!")
plot_txt(log_txt_loc)
