WikiWiki
首页
Java开发
Java面试
Linux手册
  • AI相关
  • Python Flask
  • Pytorch
  • youlo8
SEO
uniapp小程序
Vue前端
work
数据库
软件设计师
CICD
入门指南
首页
Java开发
Java面试
Linux手册
  • AI相关
  • Python Flask
  • Pytorch
  • youlo8
SEO
uniapp小程序
Vue前端
work
数据库
软件设计师
CICD
入门指南

深入PyTorch模型的训练与可视化

一、PyThorch训练基础与数据可视化

1. 模型训练与可视化

1.1模型训练的流程模板

训练模型,需要有以下内容

  • 数据集:指定哪些是训练集、测试集
  • 数据加载器:指定如何加载
  • 数据模型:定义模型结构
  • 损失函数:定义如何计算相差多少
  • 优化器:定义如何更新网络参数
  • 数据保存:保存训练好的参数,以及训练当中的信息

1.2.跑通一个PyTorch官方demo

  • 跑通一个PyTorch官方demo

    # 导入必要的 PyTorch 模块
    import torch
    from torch import nn
    from torch.utils.data import DataLoader
    from torchvision import datasets
    from torchvision.transforms import ToTensor
    
    # ======================
    # 1. 加载数据集
    # ======================
    
    # 下载并加载 FashionMNIST 训练数据集
    # root: 数据存储路径
    # train=True 表示加载训练集
    # download=True 如果本地没有数据则自动下载
    # transform=ToTensor() 将 PIL 图像或 NumPy 数组转换为 [0,1] 范围的 FloatTensor,并将 HWC 格式转为 CHW
    training_data = datasets.FashionMNIST(
        root="data",
        train=True,
        download=True,
        transform=ToTensor(),
    )
    
    # 加载测试数据集(train=False)
    test_data = datasets.FashionMNIST(
        root="data",
        train=False,
        download=True,
        transform=ToTensor(),
    )
    
    # ======================
    # 2. 创建数据加载器
    # ======================
    
    batch_size = 64  # 每批数据的样本数量
    
    # 使用 DataLoader 封装数据集,支持批量加载、打乱、多线程等
    train_dataloader = DataLoader(training_data, batch_size=batch_size)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)
    
    # 打印一个批次的数据形状,用于验证数据加载是否正确
    for X, y in test_dataloader:
        print(f"Shape of X [N, C, H, W]: {X.shape}")  # N: batch size, C: channels (1), H/W: 28x28
        print(f"Shape of y: {y.shape} {y.dtype}")  # y 是类别标签(0~9),类型为 long
        break
    
    # ======================
    # 3. 设置设备(GPU 或 CPU)
    # ======================
    
    # 自动选择可用设备:优先使用 CUDA GPU,否则用 CPU
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using {device} device")
    
    
    # ======================
    # 4. 定义神经网络模型
    # ======================
    
    class NeuralNetwork(nn.Module):
        def __init__(self):
            super().__init__()
            # 将 28x28 的图像展平为 784 维向量
            self.flatten = nn.Flatten()
            # 定义一个包含三层全连接层(带 ReLU 激活)的序列模块
            self.linear_relu_stack = nn.Sequential(
                nn.Linear(28 * 28, 512),  # 输入 784 -> 输出 512
                nn.ReLU(),  # 非线性激活
                nn.Linear(512, 512),  # 隐藏层
                nn.ReLU(),
                nn.Linear(512, 10)  # 输出层:10 个类别(FashionMNIST 有 10 类)
                # 注意:这里不加 Softmax,因为 CrossEntropyLoss 内部已包含
            )
    
        def forward(self, x):
            # 前向传播:先展平输入,再通过全连接层堆栈
            x = self.flatten(x)
            logits = self.linear_relu_stack(x)
            return logits  # 返回的是 logits(未归一化的分数)
    
    
    # 实例化模型并移动到指定设备(GPU/CPU)
    model = NeuralNetwork().to(device)
    print(model)  # 打印模型结构
    
    # ======================
    # 5. 定义损失函数和优化器
    # ======================
    
    # 使用交叉熵损失(适用于多分类任务)
    loss_fn = nn.CrossEntropyLoss()
    
    # 使用随机梯度下降(SGD)优化器,学习率设为 0.001
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    
    
    # ======================
    # 6. 定义训练函数
    # ======================
    
    def train(dataloader, model, loss_fn, optimizer):
        size = len(dataloader.dataset)  # 总样本数
        model.train()  # 设置为训练模式(启用 Dropout、BatchNorm 等)
    
        for batch, (X, y) in enumerate(dataloader):
            # 将数据移动到指定设备
            X, y = X.to(device), y.to(device)
    
            # 前向传播:计算预测值
            pred = model(X)
    
            # 计算损失(预测值 vs 真实标签)
            loss = loss_fn(pred, y)
    
            # 反向传播 + 参数更新
            loss.backward()  # 计算梯度
            optimizer.step()  # 更新参数
            optimizer.zero_grad()  # 清零梯度(防止累积)
    
            # 每 100 个 batch 打印一次训练进度
            if batch % 100 == 0:
                loss_val = loss.item()
                current = (batch + 1) * len(X)  # 当前已处理的样本数
                print(f"loss: {loss_val:>7f}  [{current:>5d}/{size:>5d}]")
    
    
    # ======================
    # 7. 定义测试函数
    # ======================
    
    def test(dataloader, model, loss_fn):
        size = len(dataloader.dataset)  # 测试集总样本数
        num_batches = len(dataloader)  # 批次数
        model.eval()  # 设置为评估模式(禁用 Dropout 等)
    
        test_loss = 0
        correct = 0
    
        # 禁用梯度计算(节省内存,加速推理)
        with torch.no_grad():
            for X, y in dataloader:
                X, y = X.to(device), y.to(device)
                pred = model(X)
    
                # 累加损失
                test_loss += loss_fn(pred, y).item()
    
                # 计算正确预测的数量
                # pred.argmax(1) 获取每行最大值的索引(即预测类别)
                correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    
        # 计算平均损失和准确率
        test_loss /= num_batches
        correct /= size  # 转换为比例(0~1)
    
        print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
    
    # ======================
    # 8. 开始训练循环
    # ======================
    
    epochs = 5  # 训练轮数
    for t in range(epochs):
        print(f"Epoch {t + 1}\n-------------------------------")
        train(train_dataloader, model, loss_fn, optimizer)  # 训练一轮
        test(test_dataloader, model, loss_fn)  # 测试一轮
    
    print("Done!")
    
    
    torch.save(model.state_dict(), "model.pth")
    print("保存成功!")
    
    model = NeuralNetwork.to(device)
    model.load_state_dict(torch.load("model.pth"))
    
    classes = [
        "T-shirt/top",
        "Trouser"
        "Pullover",
        "Dress",
        "Coat",
        "Sandal",
        "Shirt",
        "Sneaker",
        "Bag",
        "Ankle boot",
    ]
    
    model.eval()
    x, y = test_data[0][0], test_data[0][1]
    with torch.no_grad():
        x = x.to(device)
        pred = model(x)
        predicted, actual = classes[pred[0].argmax(0), classes[y]]
        print(f"predicted={predicted},actual={actual}")
    

1.3.训练函数

训练函数的作用

  • 完整的训练函数包含以下功能:

    从数据加载器中读取数据,传入模型,获取输出

    将输出和真实标签进行比较,获取误差

    使用优化器,将误差进行反向传播

    返回误差值

优化train后的代码

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm  # pip install tqdm

# 指定训练集
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# 指定测试集
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

# 创建数据加载器
batch_size = 64

train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device} device")


# 定义模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = NeuralNetwork().to(device)
print(model)

# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)


# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
    # 初始化训练数据集的大小和批次数量
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # 设置模型为训练模式
    model.train()
    # 初始化总损失和正确预测数量
    loss_total = 0
    correct = 0
    # 遍历数据加载器中的所有数据批次
    for X, y in tqdm(dataloader):
        # 将数据和标签移动到指定设备(例如GPU)
        X, y = X.to(device), y.to(device)
        # 使用模型进行预测
        pred = model(X)
        # 计算正确预测的数量
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        # 计算预测结果和真实结果之间的损失
        loss = loss_fn(pred, y)
        # 累加总损失
        loss_total += loss.item()
        # 执行反向传播,计算梯度
        loss.backward()
        # 更新模型参数
        optimizer.step()
        # 清除梯度信息
        optimizer.zero_grad()

    # 计算平均损失和准确率
    loss_avg = loss_total / num_batches
    correct /= size
    # 返回准确率和平均损失,保留三位小数
    return round(correct, 3), round(loss_avg,3)

# 定义测试函数
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100 * correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")


train_acc_list = []
train_loss_list = []

# 定义循环次数,每次循环里面,先训练,再测试
epochs = 5
for t in range(epochs):
    print(f"Epoch {t + 1}\n-------------------------------")
    train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
    train_acc_list.append(train_acc)
    train_loss_list.append((train_loss))
    test(test_dataloader, model, loss_fn)
print("Done!")
# ===== 在文件顶部或绘图前添加以下两行 =====
import matplotlib
matplotlib.use('TkAgg')  # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt

x_list = [i+1 for i in range(len(train_acc_list))]
plt.plot(x_list, train_acc_list, label="Train")
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

plt.plot(x_list, train_loss_list, label="Train")
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

1.4.测试函数

  • 完整的测试函数包含以下功能:

    从数据加载器中读取数据,传入模型,获取输出

    将输出和真实标签进行比较,获取误差

    返回误差值,以及准确度

优化train和test后的代码

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm  # pip install tqdm

# 指定训练集
training_data = datasets.FashionMNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor(),
)

# 指定测试集
test_data = datasets.FashionMNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor(),
)

# 创建数据加载器
batch_size = 64

train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

# 指定设备
device = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using {device} device")


# 定义模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


model = NeuralNetwork().to(device)
print(model)

# 定义损失函数
loss_fn = nn.CrossEntropyLoss()
# 定义优化器
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)


# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
    # 初始化训练数据集的大小和批次数量
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # 设置模型为训练模式
    model.train()
    # 初始化总损失和正确预测数量
    loss_total = 0
    correct = 0
    # 遍历数据加载器中的所有数据批次
    for X, y in tqdm(dataloader):
        # 将数据和标签移动到指定设备(例如GPU)
        X, y = X.to(device), y.to(device)
        # 使用模型进行预测
        pred = model(X)
        # 计算正确预测的数量
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        # 计算预测结果和真实结果之间的损失
        loss = loss_fn(pred, y)
        # 累加总损失
        loss_total += loss.item()
        # 执行反向传播,计算梯度
        loss.backward()
        # 更新模型参数
        optimizer.step()
        # 清除梯度信息
        optimizer.zero_grad()

    # 计算平均损失和准确率
    loss_avg = loss_total / num_batches
    correct /= size
    # 返回准确率和平均损失,保留三位小数
    return round(correct, 3), round(loss_avg,3)

# 定义测试函数
def test(dataloader, model, loss_fn):
    # 初始化测试数据集的大小和批次数量
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # 设置模型为评估模式
    model.eval()

    # 初始化测试损失和正确预测数量
    test_loss, correct = 0, 0

    # 不计算梯度,以提高计算效率并减少内存使用
    with torch.no_grad():
        # 遍历数据加载器中的所有数据批次
        for X, y in tqdm(dataloader):
            # 将数据和标签移动到指定设备(例如GPU)
            X, y = X.to(device), y.to(device)
            # 使用模型进行预测
            pred = model(X)
            # 累加预测损失
            test_loss += loss_fn(pred, y).item()
            # 累加正确预测的数量
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    # 计算平均测试损失和准确率
    test_loss /= num_batches
    correct /= size

    # 返回准确率和平均测试损失,保留三位小数
    return round(correct, 3), round(test_loss, 3)


train_acc_list = []
train_loss_list = []

test_acc_list = []
test_loss_list = []



# 定义循环次数,每次循环里面,先训练,再测试
epochs = 5
for t in range(epochs):
    print(f"Epoch {t + 1}\n-------------------------------")
    train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
    train_acc_list.append(train_acc)
    train_loss_list.append(train_loss)
    test_acc, test_loss = test(test_dataloader, model, loss_fn)
    test_acc_list.append(test_acc)
    test_loss_list.append(test_loss)
print("Done!")


# ===== 在文件顶部或绘图前添加以下两行 =====
import matplotlib
matplotlib.use('TkAgg')  # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt

x_list = [i+1 for i in range(len(train_acc_list))]
plt.plot(x_list, train_acc_list, label="Train")
plt.plot(x_list, test_acc_list, label="Test")
plt.title("Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.show()

plt.plot(x_list, train_loss_list, label="Train")
plt.plot(x_list, test_loss_list, label="Test")
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

1.5.训练的主函数

主函数的作用

  • 完整的主函数包含以下功能:

    定义训练和测试用到的数据集和数据加载器

    定义模型结构

    定义损失函数与优化器

    运行训练函数与测试函数

    记录运行信息,完成数据保存

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from tqdm import tqdm  # pip install tqdm
import matplotlib
matplotlib.use('TkAgg')  # 避免 PyCharm 后端冲突
import matplotlib.pyplot as plt
import os

# 定义模型
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

# # 定义训练函数
def train(dataloader, model, loss_fn, optimizer):
    # 初始化训练数据集的大小和批次数量
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # 设置模型为训练模式
    model.train()
    # 初始化总损失和正确预测数量
    loss_total = 0
    correct = 0
    # 遍历数据加载器中的所有数据批次
    for X, y in tqdm(dataloader):
        # 将数据和标签移动到指定设备(例如GPU)
        X, y = X.to(device), y.to(device)
        # 使用模型进行预测
        pred = model(X)
        # 计算正确预测的数量
        correct += (pred.argmax(1) == y).type(torch.float).sum().item()
        # 计算预测结果和真实结果之间的损失
        loss = loss_fn(pred, y)
        # 累加总损失
        loss_total += loss.item()
        # 执行反向传播,计算梯度
        loss.backward()
        # 更新模型参数
        optimizer.step()
        # 清除梯度信息
        optimizer.zero_grad()

    # 计算平均损失和准确率
    loss_avg = loss_total / num_batches
    correct /= size
    # 返回准确率和平均损失,保留三位小数
    return round(correct, 3), round(loss_avg,3)

# 定义测试函数
def test(dataloader, model, loss_fn):
    # 初始化测试数据集的大小和批次数量
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    # 设置模型为评估模式
    model.eval()

    # 初始化测试损失和正确预测数量
    test_loss, correct = 0, 0

    # 不计算梯度,以提高计算效率并减少内存使用
    with torch.no_grad():
        # 遍历数据加载器中的所有数据批次
        for X, y in tqdm(dataloader):
            # 将数据和标签移动到指定设备(例如GPU)
            X, y = X.to(device), y.to(device)
            # 使用模型进行预测
            pred = model(X)
            # 累加预测损失
            test_loss += loss_fn(pred, y).item()
            # 累加正确预测的数量
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    # 计算平均测试损失和准确率
    test_loss /= num_batches
    correct /= size

    # 返回准确率和平均测试损失,保留三位小数
    return round(correct, 3), round(test_loss, 3)

def writedata(txt_log_name, epoch, train_accuracy, train_loss, test_accuracy, test_loss):
    # 保存到文档
    with open(txt_log_name, "a+") as f:
        f.write(f"Epoch:{epoch}\ttrain_accuracy:{train_accuracy}\ttrain_loss:{train_loss}\ttest_accuracy:{test_accuracy}\ttest_loss:{test_loss}\n")



def plot_txt(log_txt_loc):
    with open(log_txt_loc, 'r') as f:
        log_data = f.read()

    # 解析日志数据
    epochs = []
    train_accuracies = []
    train_losses = []
    test_accuracies = []
    test_losses = []

    for line in log_data.strip().split('\n'):
        epoch, train_acc, train_loss, test_acc, test_loss = line.split('\t')
        epochs.append(int(epoch.split(':')[1]))
        train_accuracies.append(float(train_acc.split(':')[1]))
        train_losses.append(float(train_loss.split(':')[1]))
        test_accuracies.append(float(test_acc.split(':')[1]))
        test_losses.append(float(test_loss.split(':')[1]))

    # 创建折线图
    plt.figure(figsize=(10, 5))

    # 训练数据
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_accuracies, label='Train Accuracy')
    plt.plot(epochs, test_accuracies, label='Test Accuracy')
    plt.title('Training Metrics')
    plt.xlabel('Epoch')
    plt.ylabel('Value')
    plt.legend()
    # 设置横坐标刻度为整数
    plt.xticks(range(min(epochs), max(epochs) + 1))

    # 测试数据
    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_losses, label='Train Loss')
    plt.plot(epochs, test_losses, label='Test Loss')
    plt.title('Testing Metrics')
    plt.xlabel('Epoch')
    plt.ylabel('Value')
    plt.legend()
    # 设置横坐标刻度为整数
    plt.xticks(range(min(epochs), max(epochs) + 1))

    plt.tight_layout()
    plt.show()


if __name__ == '__main__':
    log_root = "logs"
    log_txt_loc = os.path.join(log_root,"log.txt")
    if os.path.isdir(log_root):
        pass
    else:
        os.mkdir(log_root)

    # 指定训练集
    training_data = datasets.FashionMNIST(
        root="data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

    # 指定测试集
    test_data = datasets.FashionMNIST(
        root="data",
        train=False,
        download=True,
        transform=ToTensor(),
    )

    # 创建数据加载器
    batch_size = 64

    train_dataloader = DataLoader(training_data, batch_size=batch_size)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    for X, y in test_dataloader:
        print(f"Shape of X [N, C, H, W]: {X.shape}")
        print(f"Shape of y: {y.shape} {y.dtype}")
        break

    # 指定设备
    device = "cuda" if torch.cuda.is_available() else "cpu"

    print(f"Using {device} device")

    model = NeuralNetwork().to(device)
    print(model)

    # 定义损失函数
    loss_fn = nn.CrossEntropyLoss()
    # 定义优化器
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    best_acc = 0
    # 定义循环次数,每次循环里面,先训练,再测试
    epochs = 5
    for t in range(epochs):
        print(f"Epoch {t + 1}\n-------------------------------")
        train_acc, train_loss = train(train_dataloader, model, loss_fn, optimizer)
        test_acc, test_loss = test(test_dataloader, model, loss_fn)
        writedata(log_txt_loc,t,train_acc,train_loss,test_acc,test_loss)

        # 保存最佳模型
        if test_acc > best_acc:
            best_acc = test_acc
            torch.save(model.state_dict(), os.path.join(log_root,"best.pth"))

        torch.save(model.state_dict(), os.path.join(log_root,"last.pth"))

    print("Done!")

    plot_txt(log_txt_loc)
最近更新:: 2026/2/27 08:59
Contributors: yanpeng_, Programmer3.Cc