跳到主要内容

模型训练

模型训练是深度学习的核心环节。本章将详细介绍 PyTorch 中的完整训练流程,包括损失函数选择、优化器配置、学习率调度、训练循环实现、性能优化技巧以及调试方法。

训练流程概述

深度学习模型的训练本质上是一个优化问题:通过调整模型参数,最小化损失函数。PyTorch 的训练流程可以概括为以下几个阶段:

训练的数学基础

神经网络的训练目标是最小化损失函数 L(θ)L(\theta),其中 θ\theta 是模型参数。最常用的优化方法是梯度下降:

θt+1=θtηθL(θt)\theta_{t+1} = \theta_t - \eta \cdot \nabla_\theta L(\theta_t)

其中 η\eta 是学习率,θL(θt)\nabla_\theta L(\theta_t) 是损失函数对参数的梯度。PyTorch 的自动微分引擎(Autograd)会自动计算这些梯度。

损失函数详解

损失函数衡量模型预测值与真实值之间的差异。选择合适的损失函数对于模型收敛至关重要。

回归任务损失函数

MSELoss(均方误差)

MSELoss 计算预测值和真实值之间的平方差的均值,是回归任务最常用的损失函数:

L=1Ni=1N(yiy^i)2L = \frac{1}{N}\sum_{i=1}^{N}(y_i - \hat{y}_i)^2

import torch
import torch.nn as nn

# 创建损失函数
criterion = nn.MSELoss()

# 示例:预测房价
predictions = torch.tensor([250.0, 300.0, 180.0]) # 预测房价(万元)
targets = torch.tensor([260.0, 290.0, 175.0]) # 实际房价

loss = criterion(predictions, targets)
print(f"MSE Loss: {loss.item():.4f}") # 输出均方误差

特点

  • 对异常值敏感(因为平方会放大大误差)
  • 梯度与误差大小成正比,大误差会产生大梯度
  • 适用于预测值分布接近正态分布的场景

L1Loss(平均绝对误差)

L1Loss 计算预测值和真实值之间绝对差的均值:

L=1Ni=1Nyiy^iL = \frac{1}{N}\sum_{i=1}^{N}|y_i - \hat{y}_i|

criterion = nn.L1Loss()
loss = criterion(predictions, targets)

特点

  • 对异常值不敏感(绝对值不会放大大误差)
  • 梯度恒定,可能导致收敛较慢
  • 当数据存在异常值时比 MSELoss 更稳健

SmoothL1Loss(平滑 L1 损失)

SmoothL1Loss 也称为 Huber Loss,结合了 MSELoss 和 L1Loss 的优点:

L(x,y)={0.5(xy)2,if xy<1xy0.5,otherwiseL(x, y) = \begin{cases} 0.5(x-y)^2, & \text{if } |x-y| < 1 \\ |x-y| - 0.5, & \text{otherwise} \end{cases}

criterion = nn.SmoothL1Loss()
loss = criterion(predictions, targets)

特点

  • 当误差较小时使用平方损失,收敛更快
  • 当误差较大时使用线性损失,减少异常值的影响
  • 常用于目标检测的边界框回归

分类任务损失函数

CrossEntropyLoss(交叉熵损失)

CrossEntropyLoss 是多分类任务的标准损失函数,它内部集成了 LogSoftmaxNLLLoss

L=i=1Cyilog(y^i)L = -\sum_{i=1}^{C}y_i\log(\hat{y}_i)

import torch
import torch.nn as nn

# 创建损失函数
criterion = nn.CrossEntropyLoss()

# 示例:图像分类(10个类别)
# 模型输出:batch_size=3, num_classes=10
logits = torch.randn(3, 10)
# 真实标签:每个样本的类别索引
labels = torch.tensor([0, 5, 9])

loss = criterion(logits, labels)
print(f"CrossEntropy Loss: {loss.item():.4f}")

重要提示

  • 输入是未归一化的 logits(不需要先经过 Softmax)
  • 标签是类别索引,不是 one-hot 编码
  • 输出会自动对类别进行 Softmax 归一化

处理类别不平衡

当各类别样本数量差异较大时,可以为不同类别设置权重:

# 假设有 3 个类别,样本比例分别为 1:2:5
# 权重应该与样本比例成反比
weights = torch.tensor([5.0, 2.5, 1.0])
criterion = nn.CrossEntropyLoss(weight=weights)

# 也可以使用 class_weight 计算工具
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight(
'balanced',
classes=np.unique(y_train),
y=y_train
)
weights = torch.tensor(class_weights, dtype=torch.float32)

标签平滑

标签平滑可以防止模型过度自信,提高泛化能力:

# label_smoothing 参数:真实标签的概率会被平滑
# 例如:[0, 0, 1, 0] -> [ε/3, ε/3, 1-ε, ε/3]
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

BCELoss 和 BCEWithLogitsLoss(二分类交叉熵)

二分类任务使用二元交叉熵损失:

# BCELoss:需要先对输出应用 Sigmoid
criterion = nn.BCELoss()
output = torch.sigmoid(model(input)) # 手动应用 Sigmoid
loss = criterion(output, target)

# BCEWithLogitsLoss:内置 Sigmoid,数值更稳定(推荐)
criterion = nn.BCEWithLogitsLoss()
logits = model(input) # 直接使用 logits
loss = criterion(logits, target)

多标签分类示例

当每个样本可以属于多个类别时,使用 BCEWithLogitsLoss:

# 多标签分类:一张图片可能同时包含猫、狗、鸟
num_classes = 3
batch_size = 4

# 模型输出(未归一化的 logits)
logits = torch.randn(batch_size, num_classes)

# 多标签目标:每个类别独立标注(0 或 1)
targets = torch.tensor([
[1, 0, 1], # 包含猫和鸟
[0, 1, 0], # 只包含狗
[1, 1, 1], # 三个都有
[0, 0, 0], # 都不包含
], dtype=torch.float32)

criterion = nn.BCEWithLogitsLoss()
loss = criterion(logits, targets)

损失函数选择指南

任务类型推荐损失函数说明
二分类BCEWithLogitsLoss内置 Sigmoid,数值稳定
多分类CrossEntropyLoss内置 Softmax
多标签分类BCEWithLogitsLoss每个类别独立判断
回归(无异常值)MSELoss常用,收敛快
回归(有异常值)L1Loss 或 SmoothL1Loss对异常值不敏感
目标检测边界框SmoothL1Loss 或 IoU Loss位置回归
语义分割CrossEntropyLoss像素级分类

优化器详解

优化器负责根据梯度更新模型参数。不同的优化器有不同的更新策略,对训练效果有显著影响。

优化器的工作原理

所有优化器都基于梯度下降的基本原理,但采用不同的策略来调整学习率和更新方向。

SGD(随机梯度下降)

SGD 是最基础的优化器,直接沿着负梯度方向更新参数:

θt+1=θtηgt\theta_{t+1} = \theta_t - \eta \cdot g_t

其中 gtg_t 是当前梯度。

import torch.optim as optim

optimizer = optim.SGD(
model.parameters(),
lr=0.01 # 学习率
)

带动量的 SGD

动量(Momentum)通过累积历史梯度来加速收敛,减少震荡:

vt=γvt1+gtv_t = \gamma \cdot v_{t-1} + g_t θt+1=θtηvt\theta_{t+1} = \theta_t - \eta \cdot v_t

optimizer = optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9, # 动量系数,通常设为 0.9
weight_decay=1e-4 # L2 正则化系数
)

动量的作用

  • 在梯度方向一致的区域加速前进
  • 在梯度方向变化的区域减速,减少震荡
  • 帮助逃离局部最小值和鞍点

Adam(自适应矩估计)

Adam 是最流行的优化器,它为每个参数自适应地调整学习率:

mt=β1mt1+(1β1)gtm_t = \beta_1 \cdot m_{t-1} + (1-\beta_1) \cdot g_t (一阶矩估计) vt=β2vt1+(1β2)gt2v_t = \beta_2 \cdot v_{t-1} + (1-\beta_2) \cdot g_t^2 (二阶矩估计) m^t=mt1β1t,v^t=vt1β2t\hat{m}_t = \frac{m_t}{1-\beta_1^t}, \hat{v}_t = \frac{v_t}{1-\beta_2^t} (偏差修正) θt+1=θtηm^tv^t+ϵ\theta_{t+1} = \theta_t - \eta \cdot \frac{\hat{m}_t}{\sqrt{\hat{v}_t} + \epsilon}

optimizer = optim.Adam(
model.parameters(),
lr=0.001, # 默认学习率
betas=(0.9, 0.999), # 一阶和二阶矩的衰减率
eps=1e-8, # 数值稳定性
weight_decay=0 # 权重衰减
)

Adam 的优势

  • 自适应学习率:每个参数有不同的有效学习率
  • 对初始学习率不敏感
  • 适合稀疏梯度和非平稳目标
  • 收敛速度快

Adam 的潜在问题

  • 可能在某些任务上泛化性能不如 SGD
  • 权重衰减实现方式可能不正确(推荐使用 AdamW)

AdamW(带正确权重衰减的 Adam)

AdamW 修正了 Adam 中权重衰减的实现,在训练 Transformer 等模型时效果更好:

optimizer = optim.AdamW(
model.parameters(),
lr=0.001,
betas=(0.9, 0.999),
weight_decay=0.01 # 解耦的权重衰减
)

Adam vs AdamW

  • Adam 中的 weight_decay 实际上是 L2 正则化,与自适应学习率耦合
  • AdamW 将权重衰减与梯度更新解耦,效果更好
  • 训练 Transformer、BERT 等模型时推荐使用 AdamW

优化器对比与选择

优化器优点缺点适用场景
SGD + Momentum泛化性能好、理论基础扎实收敛慢、需要仔细调学习率计算机视觉、学术研究
Adam收敛快、自适应学习率可能泛化差、内存占用高自然语言处理、通用场景
AdamW收敛快、权重衰减正确内存占用高Transformer、大规模模型
RMSprop适合 RNN不如 Adam 流行循环神经网络

选择建议

  1. 快速原型开发:使用 Adam,默认参数通常效果不错
  2. 追求最佳性能:尝试 SGD + Momentum,可能需要更多调参
  3. 训练 Transformer:使用 AdamW
  4. 训练 RNN/LSTM:考虑 RMSprop 或 Adam

学习率的重要性

学习率是最重要的超参数。过大的学习率会导致训练发散,过小的学习率会导致收敛缓慢。

# 学习率过大:损失震荡甚至发散
optimizer = optim.Adam(model.parameters(), lr=1.0) # 太大

# 学习率适中:稳定收敛
optimizer = optim.Adam(model.parameters(), lr=0.001) # 合适

# 学习率过小:收敛太慢
optimizer = optim.Adam(model.parameters(), lr=1e-6) # 太小

学习率调度

学习率调度器可以在训练过程中动态调整学习率,提高训练效果。

为什么需要学习率调度?

训练初期使用较大学习率可以快速接近最优解,后期减小学习率可以精细调整参数,避免在最优解附近震荡。

常用学习率调度器

StepLR(阶梯衰减)

每隔固定 epoch 数将学习率乘以衰减因子:

from torch.optim import lr_scheduler

scheduler = lr_scheduler.StepLR(
optimizer,
step_size=30, # 每 30 个 epoch 衰减一次
gamma=0.1 # 学习率乘以 0.1
)

# 学习率变化:0.001 -> 0.0001 (epoch 30) -> 0.00001 (epoch 60)

MultiStepLR(多阶梯衰减)

在指定的 epoch 衰减学习率:

scheduler = lr_scheduler.MultiStepLR(
optimizer,
milestones=[30, 60, 90], # 在第 30、60、90 个 epoch 衰减
gamma=0.1
)

ExponentialLR(指数衰减)

每个 epoch 按指数衰减学习率:

scheduler = lr_scheduler.ExponentialLR(
optimizer,
gamma=0.95 # 每个epoch:lr = lr * 0.95
)

CosineAnnealingLR(余弦退火)

学习率按余弦曲线从初始值衰减到最小值:

ηt=ηmin+12(ηmaxηmin)(1+cos(tTmaxπ))\eta_t = \eta_{min} + \frac{1}{2}(\eta_{max} - \eta_{min})(1 + \cos(\frac{t}{T_{max}}\pi))

scheduler = lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=50, # 一个周期的 epoch 数
eta_min=1e-6 # 最小学习率
)

余弦退火的优势:

  • 平滑衰减,避免突变
  • 在训练后期学习率非常小,有利于精细调整
  • 广泛用于现代深度学习模型训练

CosineAnnealingWarmRestarts(带热重启的余弦退火)

周期性地将学习率重置到初始值,有助于逃离局部最优:

scheduler = lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0=10, # 第一个周期的长度
T_mult=2, # 每次重启后周期长度的倍数
eta_min=1e-6
)

ReduceLROnPlateau(自适应衰减)

当验证损失不再下降时衰减学习率:

scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min', # 监控指标越小越好
factor=0.1, # 衰减因子
patience=5, # 容忍多少个 epoch 没有改善
verbose=True, # 打印学习率变化
min_lr=1e-6 # 最小学习率
)

# 在训练循环中使用
val_loss = validate(model, val_loader)
scheduler.step(val_loss) # 传入监控指标

OneCycleLR(单周期策略)

OneCycleLR 在一个训练周期中先增大后减小学习率,是 Leslie Smith 提出的一种高效策略:

scheduler = lr_scheduler.OneCycleLR(
optimizer,
max_lr=0.01, # 最大学习率
epochs=50,
steps_per_epoch=len(train_loader),
pct_start=0.3, # 前 30% 时间增大学习率
anneal_strategy='cos', # 使用余弦退火
div_factor=25, # 初始学习率 = max_lr / 25
final_div_factor=1e4 # 最终学习率 = max_lr / 1e4
)

# 注意:OneCycleLR 每个 batch 后都要调用
for batch in train_loader:
optimizer.step()
scheduler.step() # 每个 batch 更新一次

OneCycleLR 的优势

  • 无需手动调整学习率
  • 训练速度快,效果好
  • 超参数少,易于使用

学习率调度器的使用方式

# 方式一:每个 epoch 后调用(大多数调度器)
for epoch in range(num_epochs):
train(model, train_loader, optimizer, criterion)
val_loss = validate(model, val_loader, criterion)

scheduler.step() # 更新学习率
# 注意:如果使用 ReduceLROnPlateau,需要传入指标
# scheduler.step(val_loss)

# 方式二:每个 batch 后调用(OneCycleLR, CyclicLR 等)
for epoch in range(num_epochs):
for batch in train_loader:
loss = train_step(model, batch, optimizer, criterion)
optimizer.step()
scheduler.step() # 每个 batch 更新

学习率调度器选择建议

调度器适用场景特点
StepLR常规训练简单,可预测
MultiStepLR需要精细控制灵活指定衰减时机
CosineAnnealingLR现代模型训练平滑衰减,效果好
ReduceLROnPlateau不确定训练时长自适应调整
OneCycleLR快速实验自动化程度高,效果好

完整训练循环

基础训练循环

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 1. 准备数据
# 假设我们有一个简单的回归任务
X_train = torch.randn(1000, 20)
y_train = torch.randn(1000, 1)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

X_val = torch.randn(200, 20)
y_val = torch.randn(200, 1)
val_dataset = TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32)

# 2. 定义模型
class SimpleModel(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1)
)

def forward(self, x):
return self.network(x)

model = SimpleModel(input_dim=20)

# 3. 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)

# 4. 训练函数
def train_epoch(model, train_loader, criterion, optimizer, device):
"""训练一个 epoch"""
model.train()
total_loss = 0.0

for batch_X, batch_y in train_loader:
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)

# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)

# 反向传播
optimizer.zero_grad() # 清零梯度
loss.backward() # 计算梯度
optimizer.step() # 更新参数

total_loss += loss.item() * batch_X.size(0)

return total_loss / len(train_loader.dataset)

# 5. 验证函数
def validate(model, val_loader, criterion, device):
"""验证模型"""
model.eval()
total_loss = 0.0

with torch.no_grad(): # 不计算梯度
for batch_X, batch_y in val_loader:
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)

outputs = model(batch_X)
loss = criterion(outputs, batch_y)

total_loss += loss.item() * batch_X.size(0)

return total_loss / len(val_loader.dataset)

# 6. 完整训练循环
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

num_epochs = 100
best_val_loss = float('inf')

for epoch in range(num_epochs):
# 训练
train_loss = train_epoch(model, train_loader, criterion, optimizer, device)

# 验证
val_loss = validate(model, val_loader, criterion, device)

# 更新学习率
scheduler.step()

# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': val_loss,
}, 'best_model.pth')

# 打印训练信息
if (epoch + 1) % 10 == 0:
current_lr = optimizer.param_groups[0]['lr']
print(f'Epoch [{epoch+1}/{num_epochs}] '
f'Train Loss: {train_loss:.4f} '
f'Val Loss: {val_loss:.4f} '
f'LR: {current_lr:.6f}')

print(f'训练完成!最佳验证损失: {best_val_loss:.4f}')

训练循环的关键步骤

理解训练循环中每个步骤的作用非常重要:

for batch in train_loader:
# 步骤 1:清零梯度
optimizer.zero_grad()

# 步骤 2:前向传播
outputs = model(inputs)

# 步骤 3:计算损失
loss = criterion(outputs, targets)

# 步骤 4:反向传播(计算梯度)
loss.backward()

# 步骤 5:更新参数
optimizer.step()

为什么要 zero_grad()?

PyTorch 默认会累积梯度(适用于 RNN 等场景),但大多数情况下我们需要每个 batch 重新计算梯度,所以必须手动清零:

# 如果不清零梯度
for batch in train_loader:
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward() # 梯度会累积到之前的梯度上
optimizer.step()
# 结果:梯度越来越大,训练不稳定

# 正确做法
for batch in train_loader:
optimizer.zero_grad() # 清零梯度
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

混合精度训练

混合精度训练(Mixed Precision Training)使用 16 位浮点数(FP16 或 BF16)进行部分计算,可以显著提升训练速度并减少显存占用。

为什么使用混合精度?

传统的深度学习训练使用 32 位浮点数(FP32),但这存在两个问题:

  1. 显存占用大:FP32 每个数占 4 字节
  2. 计算速度慢:现代 GPU 对低精度计算有专门优化

混合精度训练使用 FP16 进行大部分计算,但对数值敏感的操作仍使用 FP32,从而在保持精度的同时获得性能提升。

PyTorch AMP 使用方法

PyTorch 的 torch.amp 模块(Automatic Mixed Precision)让混合精度训练变得简单:

import torch
from torch.amp import autocast, GradScaler

# 创建梯度缩放器(仅在使用 FP16 时需要)
scaler = GradScaler('cuda')

model = model.cuda()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(num_epochs):
for inputs, targets in train_loader:
inputs = inputs.cuda()
targets = targets.cuda()

optimizer.zero_grad()

# 使用 autocast 进行混合精度前向传播
with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)

# 使用 scaler 进行反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()

autocast 的作用

autocast 上下文管理器会自动选择每个操作的数据类型:

  • 适合 FP16 的操作:卷积、矩阵乘法等(速度快)
  • 需要 FP32 的操作:softmax、损失函数等(数值稳定)
from torch.amp import autocast

# CUDA 设备
with autocast(device_type='cuda', dtype=torch.float16):
output = model(input)

# CPU 设备(使用 bfloat16)
with autocast(device_type='cpu', dtype=torch.bfloat16):
output = model(input)

GradScaler 的作用

FP16 的数值范围有限,小梯度可能变为 0(下溢)。GradScaler 通过缩放损失来防止这种情况:

scaler = GradScaler(
device='cuda',
init_scale=2**16, # 初始缩放因子
growth_factor=2.0, # 无溢出时的增长倍数
backoff_factor=0.5, # 溢出时的衰减倍数
growth_interval=2000 # 每隔多少步尝试增大缩放因子
)

# 训练步骤
optimizer.zero_grad()
with autocast(device_type='cuda'):
loss = model(input)

# 缩放损失,反向传播
scaler.scale(loss).backward()

# 取消缩放梯度,更新参数
scaler.step(optimizer)

# 更新缩放因子
scaler.update()

BF16 vs FP16

特性FP16BF16
数值范围10810^{-8} ~ 6550465504103810^{-38} ~ 103810^{38}
精度
硬件支持广泛Ampere 及更新 GPU
溢出风险
需要 GradScaler

推荐

  • 如果 GPU 支持 BF16(Ampere 及更新),优先使用 BF16
  • 使用 BF16 时可以不需要 GradScaler
# 使用 BF16(不需要 GradScaler)
with autocast(device_type='cuda', dtype=torch.bfloat16):
output = model(input)
loss = criterion(output, target)

loss.backward()
optimizer.step()

混合精度训练最佳实践

import torch
import torch.nn as nn
import torch.optim as optim
from torch.amp import autocast, GradScaler
from torch.utils.data import DataLoader

def train_with_amp(model, train_loader, num_epochs):
device = torch.device('cuda')
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)

# 使用 FP16
scaler = GradScaler('cuda')
use_amp = True

for epoch in range(num_epochs):
model.train()
for inputs, targets in train_loader:
inputs = inputs.to(device)
targets = targets.to(device)

optimizer.zero_grad()

if use_amp:
# 混合精度训练
with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)

scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
# 普通 FP32 训练
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

return model

性能对比

在 NVIDIA A100 GPU 上,混合精度训练通常可以获得:

  • 速度提升:1.5x - 3x
  • 显存节省:约 50%
  • 精度损失:通常可忽略

torch.compile 加速训练

PyTorch 2.0 引入了 torch.compile,可以在不修改代码的情况下显著加速模型训练。

基本用法

import torch

model = MyModel()

# 一行代码编译模型
model = torch.compile(model)

# 正常使用,无需修改其他代码
output = model(input)

编译模式

torch.compile 支持多种编译模式,平衡编译时间和执行效率:

# 默认模式:平衡编译时间和执行效率
model = torch.compile(model)

# 最大优化模式:编译时间更长,运行更快
model = torch.compile(model, mode='max-autotune')

# 减少内存模式:降低内存占用
model = torch.compile(model, mode='reduce-overhead')

# 默认模式(显式指定)
model = torch.compile(model, mode='default')

编译特定部分

可以只编译模型的特定部分:

class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.backbone = ResNet()
self.head = nn.Linear(512, 10)

# 只编译骨干网络
self.backbone = torch.compile(self.backbone)

def forward(self, x):
x = self.backbone(x)
x = self.head(x)
return x

动态形状支持

对于输入形状变化的模型:

# 支持动态形状
model = torch.compile(model, dynamic=True)

torch.compile 与 AMP 结合

import torch
from torch.amp import autocast, GradScaler

model = torch.compile(model)
scaler = GradScaler('cuda')

for inputs, targets in train_loader:
optimizer.zero_grad()

with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)

scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()

编译优化器

PyTorch 2.2+ 支持编译优化器,进一步提升性能:

import torch

model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 编译模型和优化器
model = torch.compile(model)
optimizer = torch.compile(optimizer)

性能优化技巧

  1. 首次运行较慢:编译需要时间,首次运行会较慢,后续运行会很快
  2. 避免频繁改变输入形状:这会触发重新编译
  3. 使用 mode='reduce-overhead' 减少内存占用
  4. 对于大模型效果更明显:小模型可能看不出明显加速

高级训练技巧

梯度裁剪

梯度裁剪可以防止梯度爆炸,稳定训练过程:

import torch.nn.utils as utils

# 训练循环中
for inputs, targets in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()

# 梯度裁剪:限制梯度的最大范数
max_grad_norm = 1.0
utils.clip_grad_norm_(model.parameters(), max_grad_norm)

optimizer.step()

何时使用梯度裁剪

  • RNN/LSTM 训练(梯度爆炸常见)
  • 训练不稳定时
  • 使用大学习率时

梯度累积

当 GPU 显存不足,无法使用大 batch size 时,可以使用梯度累积:

accumulation_steps = 4  # 累积 4 个 batch 后更新
optimizer.zero_grad()

for i, (inputs, targets) in enumerate(train_loader):
outputs = model(inputs)
loss = criterion(outputs, targets)

# 缩放损失
loss = loss / accumulation_steps
loss.backward()

# 累积够步数后更新参数
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()

等效 batch size:实际 batch size = batch_size × accumulation_steps

早停法(Early Stopping)

防止过拟合,在验证损失不再改善时停止训练:

class EarlyStopping:
def __init__(self, patience=7, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False

def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.counter = 0

# 使用早停
early_stopping = EarlyStopping(patience=10, min_delta=0.001)

for epoch in range(num_epochs):
train_loss = train_epoch(model, train_loader, criterion, optimizer)
val_loss = validate(model, val_loader, criterion)

early_stopping(val_loss)
if early_stopping.early_stop:
print(f"早停于 epoch {epoch}")
break

模型权重平均(EMA)

指数移动平均可以使模型更加稳定:

class EMA:
def __init__(self, model, decay=0.999):
self.model = model
self.decay = decay
self.shadow = {}
self.backup = {}

# 初始化影子参数
for name, param in model.named_parameters():
if param.requires_grad:
self.shadow[name] = param.data.clone()

def update(self):
"""更新影子参数"""
for name, param in self.model.named_parameters():
if param.requires_grad:
new_average = self.decay * self.shadow[name] + (1 - self.decay) * param.data
self.shadow[name] = new_average.clone()

def apply_shadow(self):
"""应用影子参数(用于评估)"""
for name, param in self.model.named_parameters():
if param.requires_grad:
self.backup[name] = param.data.clone()
param.data = self.shadow[name]

def restore(self):
"""恢复原始参数"""
for name, param in self.model.named_parameters():
if param.requires_grad:
param.data = self.backup[name]
self.backup = {}

# 使用 EMA
ema = EMA(model, decay=0.999)

for epoch in range(num_epochs):
train_epoch(model, train_loader, criterion, optimizer)
ema.update() # 每个 epoch 更新

# 验证时使用 EMA 权重
ema.apply_shadow()
val_loss = validate(model, val_loader, criterion)
ema.restore()

学习率预热(Warmup)

训练开始时使用小学习率,逐步增加到目标学习率:

def get_lr_warmup_scheduler(optimizer, warmup_epochs, target_lr):
"""简单的学习率预热"""
def lr_lambda(epoch):
if epoch < warmup_epochs:
return (epoch + 1) / warmup_epochs
return 1.0

return optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

scheduler = get_lr_warmup_scheduler(optimizer, warmup_epochs=5, target_lr=0.001)

权重初始化

良好的权重初始化可以加速收敛:

def init_weights(m):
"""权重初始化函数"""
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight) # Xavier 初始化
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)

# 应用初始化
model.apply(init_weights)

常用初始化方法

  • xavier_uniform_:适用于 tanh、sigmoid 激活函数
  • kaiming_normal_:适用于 ReLU 激活函数
  • trunc_normal_:适用于 Transformer

训练调试技巧

监控梯度

检查梯度可以帮助诊断训练问题:

def monitor_gradients(model):
"""监控模型梯度统计信息"""
for name, param in model.named_parameters():
if param.grad is not None:
grad_norm = param.grad.norm().item()
grad_max = param.grad.max().item()
grad_min = param.grad.min().item()

print(f"{name}: norm={grad_norm:.6f}, "
f"max={grad_max:.6f}, min={grad_min:.6f}")

# 在训练中调用
loss.backward()
monitor_gradients(model)

检查模型输出

def check_model_output(model, inputs):
"""检查模型输出是否正常"""
model.eval()
with torch.no_grad():
outputs = model(inputs)

print(f"输出形状: {outputs.shape}")
print(f"输出范围: [{outputs.min().item():.4f}, {outputs.max().item():.4f}]")
print(f"输出均值: {outputs.mean().item():.4f}")
print(f"输出标准差: {outputs.std().item():.4f}")

# 检查是否有 NaN 或 Inf
if torch.isnan(outputs).any():
print("警告:输出包含 NaN!")
if torch.isinf(outputs).any():
print("警告:输出包含 Inf!")

过拟合单个 batch

在开始完整训练前,先验证模型能否过拟合单个 batch:

# 获取单个 batch
single_batch = next(iter(train_loader))
inputs, targets = single_batch

# 训练 100 个 epoch,看损失是否降到接近 0
for epoch in range(100):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.6f}")

# 如果损失降到接近 0,说明模型有学习能力
# 如果损失不下降,说明模型或数据有问题

学习率范围测试

找到合适的学习率范围:

def find_lr(model, train_loader, criterion, optimizer, 
init_value=1e-8, final_value=10, num_iter=100):
"""学习率范围测试"""
model.train()

# 设置初始学习率
for param_group in optimizer.param_groups:
param_group['lr'] = init_value

# 计算学习率增长因子
mult = (final_value / init_value) ** (1 / num_iter)

losses = []
lrs = []
best_loss = float('inf')

iterator = iter(train_loader)
for i in range(num_iter):
try:
inputs, targets = next(iterator)
except StopIteration:
iterator = iter(train_loader)
inputs, targets = next(iterator)

optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

losses.append(loss.item())
lrs.append(optimizer.param_groups[0]['lr'])

# 更新学习率
for param_group in optimizer.param_groups:
param_group['lr'] *= mult

# 如果损失爆炸,提前停止
if loss.item() > best_loss * 10:
break

best_loss = min(best_loss, loss.item())

return lrs, losses

# 使用
lrs, losses = find_lr(model, train_loader, criterion, optimizer)
# 绘制学习率-损失曲线,找到损失快速下降的学习率

常见问题排查

问题可能原因解决方案
损失是 NaN学习率过大、梯度爆炸降低学习率、梯度裁剪
损失不下降学习率过小、模型问题增大学习率、检查模型
训练震荡学习率过大降低学习率、使用动量
过拟合模型太复杂、数据太少正则化、数据增强、早停
欠拟合模型太简单、训练不够增加模型容量、延长训练

断点续训

训练过程可能因各种原因中断,需要支持断点续训:

import os

def save_checkpoint(model, optimizer, scheduler, epoch, loss, path):
"""保存检查点"""
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
'loss': loss,
}, path)

def load_checkpoint(model, optimizer, scheduler, path):
"""加载检查点"""
if os.path.exists(path):
checkpoint = torch.load(path)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
if scheduler and checkpoint['scheduler_state_dict']:
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
return checkpoint['epoch'], checkpoint['loss']
return 0, float('inf')

# 使用示例
checkpoint_path = 'checkpoint.pth'
start_epoch, best_loss = load_checkpoint(model, optimizer, scheduler, checkpoint_path)

for epoch in range(start_epoch, num_epochs):
train_loss = train_epoch(model, train_loader, criterion, optimizer)
val_loss = validate(model, val_loader, criterion)

scheduler.step()

# 定期保存检查点
if (epoch + 1) % 5 == 0:
save_checkpoint(model, optimizer, scheduler, epoch + 1, val_loss, checkpoint_path)

# 保存最佳模型
if val_loss < best_loss:
best_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')

小结

本章我们深入学习了 PyTorch 模型训练的完整流程:

  1. 损失函数:回归任务使用 MSE/L1/SmoothL1,分类任务使用 CrossEntropy/BCE
  2. 优化器:SGD 适合追求泛化,Adam 适合快速收敛,AdamW 适合 Transformer
  3. 学习率调度:余弦退火效果好,OneCycleLR 自动化程度高
  4. 训练循环:理解 zero_grad、backward、step 的作用
  5. 混合精度训练:使用 torch.amp 加速训练、节省显存
  6. torch.compile:一行代码加速模型
  7. 高级技巧:梯度裁剪、梯度累积、早停、EMA
  8. 调试方法:监控梯度、过拟合单 batch、学习率范围测试

练习

  1. 实现一个完整的分类任务训练流程,包含训练、验证、测试
  2. 尝试不同的优化器(SGD、Adam、AdamW),比较收敛速度和最终效果
  3. 使用混合精度训练,对比 FP32 训练的速度和显存占用
  4. 实现早停法和学习率预热
  5. 编写一个完整的断点续训功能

参考资料