模型训练
模型训练是深度学习的核心环节。本章将详细介绍 PyTorch 中的完整训练流程,包括损失函数选择、优化器配置、学习率调度、训练循环实现、性能优化技巧以及调试方法。
训练流程概述
深度学习模型的训练本质上是一个优化问题:通过调整模型参数,最小化损失函数。PyTorch 的训练流程可以概括为以下几个阶段:
训练的数学基础:
神经网络的训练目标是最小化损失函数 ,其中 是模型参数。最常用的优化方法是梯度下降:
其中 是学习率, 是损失函数对参数的梯度。PyTorch 的自动微分引擎(Autograd)会自动计算这些梯度。
损失函数详解
损失函数衡量模型预测值与真实值之间的差异。选择合适的损失函数对于模型收敛至关重要。
回归任务损失函数
MSELoss(均方误差)
MSELoss 计算预测值和真实值之间的平方差的均值,是回归任务最常用的损失函数:
import torch
import torch.nn as nn
# 创建损失函数
criterion = nn.MSELoss()
# 示例:预测房价
predictions = torch.tensor([250.0, 300.0, 180.0]) # 预测房价(万元)
targets = torch.tensor([260.0, 290.0, 175.0]) # 实际房价
loss = criterion(predictions, targets)
print(f"MSE Loss: {loss.item():.4f}") # 输出均方误差
特点:
- 对异常值敏感(因为平方会放大大误差)
- 梯度与误差大小成正比,大误差会产生大梯度
- 适用于预测值分布接近正态分布的场景
L1Loss(平均绝对误差)
L1Loss 计算预测值和真实值之间绝对差的均值:
criterion = nn.L1Loss()
loss = criterion(predictions, targets)
特点:
- 对异常值不敏感(绝对值不会放大大误差)
- 梯度恒定,可能导致收敛较慢
- 当数据存在异常值时比 MSELoss 更稳健
SmoothL1Loss(平滑 L1 损失)
SmoothL1Loss 也称为 Huber Loss,结合了 MSELoss 和 L1Loss 的优点:
criterion = nn.SmoothL1Loss()
loss = criterion(predictions, targets)
特点:
- 当误差较小时使用平方损失,收敛更快
- 当误差较大时使用线性损失,减少异常值的影响
- 常用于目标检测的边界框回归
分类任务损失函数
CrossEntropyLoss(交叉熵损失)
CrossEntropyLoss 是多分类任务的标准损失函数,它内部集成了 LogSoftmax 和 NLLLoss:
import torch
import torch.nn as nn
# 创建损失函数
criterion = nn.CrossEntropyLoss()
# 示例:图像分类(10个类别)
# 模型输出:batch_size=3, num_classes=10
logits = torch.randn(3, 10)
# 真实标签:每个样本的类别索引
labels = torch.tensor([0, 5, 9])
loss = criterion(logits, labels)
print(f"CrossEntropy Loss: {loss.item():.4f}")
重要提示:
- 输入是未归一化的 logits(不需要先经过 Softmax)
- 标签是类别索引,不是 one-hot 编码
- 输出会自动对类别进行 Softmax 归一化
处理类别不平衡:
当各类别样本数量差异较大时,可以为不同类别设置权重:
# 假设有 3 个类别,样本比例分别为 1:2:5
# 权重应该与样本比例成反比
weights = torch.tensor([5.0, 2.5, 1.0])
criterion = nn.CrossEntropyLoss(weight=weights)
# 也可以使用 class_weight 计算工具
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight(
'balanced',
classes=np.unique(y_train),
y=y_train
)
weights = torch.tensor(class_weights, dtype=torch.float32)
标签平滑:
标签平滑可以防止模型过度自信,提高泛化能力:
# label_smoothing 参数:真实标签的概率会被平滑
# 例如:[0, 0, 1, 0] -> [ε/3, ε/3, 1-ε, ε/3]
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
BCELoss 和 BCEWithLogitsLoss(二分类交叉熵)
二分类任务使用二元交叉熵损失:
# BCELoss:需要先对输出应用 Sigmoid
criterion = nn.BCELoss()
output = torch.sigmoid(model(input)) # 手动应用 Sigmoid
loss = criterion(output, target)
# BCEWithLogitsLoss:内置 Sigmoid,数值更稳定(推荐)
criterion = nn.BCEWithLogitsLoss()
logits = model(input) # 直接使用 logits
loss = criterion(logits, target)
多标签分类示例:
当每个样本可以属于多个类别时,使用 BCEWithLogitsLoss:
# 多标签分类:一张图片可能同时包含猫、狗、鸟
num_classes = 3
batch_size = 4
# 模型输出(未归一化的 logits)
logits = torch.randn(batch_size, num_classes)
# 多标签目标:每个类别独立标注(0 或 1)
targets = torch.tensor([
[1, 0, 1], # 包含猫和鸟
[0, 1, 0], # 只包含狗
[1, 1, 1], # 三个都有
[0, 0, 0], # 都不包含
], dtype=torch.float32)
criterion = nn.BCEWithLogitsLoss()
loss = criterion(logits, targets)
损失函数选择指南
| 任务类型 | 推荐损失函数 | 说明 |
|---|---|---|
| 二分类 | BCEWithLogitsLoss | 内置 Sigmoid,数值稳定 |
| 多分类 | CrossEntropyLoss | 内置 Softmax |
| 多标签分类 | BCEWithLogitsLoss | 每个类别独立判断 |
| 回归(无异常值) | MSELoss | 常用,收敛快 |
| 回归(有异常值) | L1Loss 或 SmoothL1Loss | 对异常值不敏感 |
| 目标检测边界框 | SmoothL1Loss 或 IoU Loss | 位置回归 |
| 语义分割 | CrossEntropyLoss | 像素级分类 |
优化器详解
优化器负责根据梯度更新模型参数。不同的优化器有不同的更新策略,对训练效果有显著影响。
优化器的工作原理
所有优化器都基于梯度下降的基本原理,但采用不同的策略来调整学习率和更新方向。
SGD(随机梯度下降)
SGD 是最基础的优化器,直接沿着负梯度方向更新参数:
其中 是当前梯度。
import torch.optim as optim
optimizer = optim.SGD(
model.parameters(),
lr=0.01 # 学习率
)
带动量的 SGD:
动量(Momentum)通过累积历史梯度来加速收敛,减少震荡:
optimizer = optim.SGD(
model.parameters(),
lr=0.01,
momentum=0.9, # 动量系数,通常设为 0.9
weight_decay=1e-4 # L2 正则化系数
)
动量的作用:
- 在梯度方向一致的区域加速前进
- 在梯度方向变化的区域减速,减少震荡
- 帮助逃离局部最小值和鞍点
Adam(自适应矩估计)
Adam 是最流行的优化器,它为每个参数自适应地调整学习率:
(一阶矩估计) (二阶矩估计) (偏差修正)
optimizer = optim.Adam(
model.parameters(),
lr=0.001, # 默认学习率
betas=(0.9, 0.999), # 一阶和二阶矩的衰减率
eps=1e-8, # 数值稳定性
weight_decay=0 # 权重衰减
)
Adam 的优势:
- 自适应学习率:每个参数有不同的有效学习率
- 对初始学习率不敏感
- 适合稀疏梯度和非平稳目标
- 收敛速度快
Adam 的潜在问题:
- 可能在某些任务上泛化性能不如 SGD
- 权重衰减实现方式可能不正确(推荐使用 AdamW)
AdamW(带正确权重衰减的 Adam)
AdamW 修正了 Adam 中权重衰减的实现,在训练 Transformer 等模型时效果更好:
optimizer = optim.AdamW(
model.parameters(),
lr=0.001,
betas=(0.9, 0.999),
weight_decay=0.01 # 解耦的权重衰减
)
Adam vs AdamW:
- Adam 中的 weight_decay 实际上是 L2 正则化,与自适应学习率耦合
- AdamW 将权重衰减与梯度更新解耦,效果更好
- 训练 Transformer、BERT 等模型时推荐使用 AdamW
优化器对比与选择
| 优化器 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| SGD + Momentum | 泛化性能好、理论基础扎实 | 收敛慢、需要仔细调学习率 | 计算机视觉、学术研究 |
| Adam | 收敛快、自适应学习率 | 可能泛化差、内存占用高 | 自然语言处理、通用场景 |
| AdamW | 收敛快、权重衰减正确 | 内存占用高 | Transformer、大规模模型 |
| RMSprop | 适合 RNN | 不如 Adam 流行 | 循环神经网络 |
选择建议:
- 快速原型开发:使用 Adam,默认参数通常效果不错
- 追求最佳性能:尝试 SGD + Momentum,可能需要更多调参
- 训练 Transformer:使用 AdamW
- 训练 RNN/LSTM:考虑 RMSprop 或 Adam
学习率的重要性
学习率是最重要的超参数。过大的学习率会导致训练发散,过小的学习率会导致收敛缓慢。
# 学习率过大:损失震荡甚至发散
optimizer = optim.Adam(model.parameters(), lr=1.0) # 太大
# 学习率适中:稳定收敛
optimizer = optim.Adam(model.parameters(), lr=0.001) # 合适
# 学习率过小:收敛太慢
optimizer = optim.Adam(model.parameters(), lr=1e-6) # 太小
学习率调度
学习率调度器可以在训练过程中动态调整学习率,提高训练效果。
为什么需要学习率调度?
训练初期使用较大学习率可以快速接近最优解,后期减小学习率可以精细调整参数,避免在最优解附近震荡。
常用学习率调度器
StepLR(阶梯衰减)
每隔固定 epoch 数将学习率乘以衰减因子:
from torch.optim import lr_scheduler
scheduler = lr_scheduler.StepLR(
optimizer,
step_size=30, # 每 30 个 epoch 衰减一次
gamma=0.1 # 学习率乘以 0.1
)
# 学习率变化:0.001 -> 0.0001 (epoch 30) -> 0.00001 (epoch 60)
MultiStepLR(多阶梯衰减)
在指定的 epoch 衰减学习率:
scheduler = lr_scheduler.MultiStepLR(
optimizer,
milestones=[30, 60, 90], # 在第 30、60、90 个 epoch 衰减
gamma=0.1
)
ExponentialLR(指数衰减)
每个 epoch 按指数衰减学习率:
scheduler = lr_scheduler.ExponentialLR(
optimizer,
gamma=0.95 # 每个epoch:lr = lr * 0.95
)
CosineAnnealingLR(余弦退火)
学习率按余弦曲线从初始值衰减到最小值:
scheduler = lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=50, # 一个周期的 epoch 数
eta_min=1e-6 # 最小学习率
)
余弦退火的优势:
- 平滑衰减,避免突变
- 在训练后期学习率非常小,有利于精细调整
- 广泛用于现代深度学习模型训练
CosineAnnealingWarmRestarts(带热重启的余弦退火)
周期性地将学习率重置到初始值,有助于逃离局部最优:
scheduler = lr_scheduler.CosineAnnealingWarmRestarts(
optimizer,
T_0=10, # 第一个周期的长度
T_mult=2, # 每次重启后周期长度的倍数
eta_min=1e-6
)
ReduceLROnPlateau(自适应衰减)
当验证损失不再下降时衰减学习率:
scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer,
mode='min', # 监控指标越小越好
factor=0.1, # 衰减因子
patience=5, # 容忍多少个 epoch 没有改善
verbose=True, # 打印学习率变化
min_lr=1e-6 # 最小学习率
)
# 在训练循环中使用
val_loss = validate(model, val_loader)
scheduler.step(val_loss) # 传入监控指标
OneCycleLR(单周期策略)
OneCycleLR 在一个训练周期中先增大后减小学习率,是 Leslie Smith 提出的一种高效策略:
scheduler = lr_scheduler.OneCycleLR(
optimizer,
max_lr=0.01, # 最大学习率
epochs=50,
steps_per_epoch=len(train_loader),
pct_start=0.3, # 前 30% 时间增大学习率
anneal_strategy='cos', # 使用余弦退火
div_factor=25, # 初始学习率 = max_lr / 25
final_div_factor=1e4 # 最终学习率 = max_lr / 1e4
)
# 注意:OneCycleLR 每个 batch 后都要调用
for batch in train_loader:
optimizer.step()
scheduler.step() # 每个 batch 更新一次
OneCycleLR 的优势:
- 无需手动调整学习率
- 训练速度快,效果好
- 超参数少,易于使用
学习率调度器的使用方式
# 方式一:每个 epoch 后调用(大多数调度器)
for epoch in range(num_epochs):
train(model, train_loader, optimizer, criterion)
val_loss = validate(model, val_loader, criterion)
scheduler.step() # 更新学习率
# 注意:如果使用 ReduceLROnPlateau,需要传入指标
# scheduler.step(val_loss)
# 方式二:每个 batch 后调用(OneCycleLR, CyclicLR 等)
for epoch in range(num_epochs):
for batch in train_loader:
loss = train_step(model, batch, optimizer, criterion)
optimizer.step()
scheduler.step() # 每个 batch 更新
学习率调度器选择建议
| 调度器 | 适用场景 | 特点 |
|---|---|---|
| StepLR | 常规训练 | 简单,可预测 |
| MultiStepLR | 需要精细控制 | 灵活指定衰减时机 |
| CosineAnnealingLR | 现代模型训练 | 平滑衰减,效果好 |
| ReduceLROnPlateau | 不确定训练时长 | 自适应调整 |
| OneCycleLR | 快速实验 | 自动化程度高,效果好 |
完整训练循环
基础训练循环
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# 1. 准备数据
# 假设我们有一个简单的回归任务
X_train = torch.randn(1000, 20)
y_train = torch.randn(1000, 1)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
X_val = torch.randn(200, 20)
y_val = torch.randn(200, 1)
val_dataset = TensorDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=32)
# 2. 定义模型
class SimpleModel(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, x):
return self.network(x)
model = SimpleModel(input_dim=20)
# 3. 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.5)
# 4. 训练函数
def train_epoch(model, train_loader, criterion, optimizer, device):
"""训练一个 epoch"""
model.train()
total_loss = 0.0
for batch_X, batch_y in train_loader:
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)
# 前向传播
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
# 反向传播
optimizer.zero_grad() # 清零梯度
loss.backward() # 计算梯度
optimizer.step() # 更新参数
total_loss += loss.item() * batch_X.size(0)
return total_loss / len(train_loader.dataset)
# 5. 验证函数
def validate(model, val_loader, criterion, device):
"""验证模型"""
model.eval()
total_loss = 0.0
with torch.no_grad(): # 不计算梯度
for batch_X, batch_y in val_loader:
batch_X = batch_X.to(device)
batch_y = batch_y.to(device)
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
total_loss += loss.item() * batch_X.size(0)
return total_loss / len(val_loader.dataset)
# 6. 完整训练循环
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
num_epochs = 100
best_val_loss = float('inf')
for epoch in range(num_epochs):
# 训练
train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
# 验证
val_loss = validate(model, val_loader, criterion, device)
# 更新学习率
scheduler.step()
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': val_loss,
}, 'best_model.pth')
# 打印训练信息
if (epoch + 1) % 10 == 0:
current_lr = optimizer.param_groups[0]['lr']
print(f'Epoch [{epoch+1}/{num_epochs}] '
f'Train Loss: {train_loss:.4f} '
f'Val Loss: {val_loss:.4f} '
f'LR: {current_lr:.6f}')
print(f'训练完成!最佳验证损失: {best_val_loss:.4f}')
训练循环的关键步骤
理解训练循环中每个步骤的作用非常重要:
for batch in train_loader:
# 步骤 1:清零梯度
optimizer.zero_grad()
# 步骤 2:前向传播
outputs = model(inputs)
# 步骤 3:计算损失
loss = criterion(outputs, targets)
# 步骤 4:反向传播(计算梯度)
loss.backward()
# 步骤 5:更新参数
optimizer.step()
为什么要 zero_grad()?
PyTorch 默认会累积梯度(适用于 RNN 等场景),但大多数情况下我们需要每个 batch 重新计算梯度,所以必须手动清零:
# 如果不清零梯度
for batch in train_loader:
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward() # 梯度会累积到之前的梯度上
optimizer.step()
# 结果:梯度越来越大,训练不稳定
# 正确做法
for batch in train_loader:
optimizer.zero_grad() # 清零梯度
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
混合精度训练
混合精度训练(Mixed Precision Training)使用 16 位浮点数(FP16 或 BF16)进行部分计算,可以显著提升训练速度并减少显存占用。
为什么使用混合精度?
传统的深度学习训练使用 32 位浮点数(FP32),但这存在两个问题:
- 显存占用大:FP32 每个数占 4 字节
- 计算速度慢:现代 GPU 对低精度计算有专门优化
混合精度训练使用 FP16 进行大部分计算,但对数值敏感的操作仍使用 FP32,从而在保持精度的同时获得性能提升。
PyTorch AMP 使用方法
PyTorch 的 torch.amp 模块(Automatic Mixed Precision)让混合精度训练变得简单:
import torch
from torch.amp import autocast, GradScaler
# 创建梯度缩放器(仅在使用 FP16 时需要)
scaler = GradScaler('cuda')
model = model.cuda()
optimizer = optim.Adam(model.parameters(), lr=0.001)
for epoch in range(num_epochs):
for inputs, targets in train_loader:
inputs = inputs.cuda()
targets = targets.cuda()
optimizer.zero_grad()
# 使用 autocast 进行混合精度前向传播
with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)
# 使用 scaler 进行反向传播
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
autocast 的作用
autocast 上下文管理器会自动选择每个操作的数据类型:
- 适合 FP16 的操作:卷积、矩阵乘法等(速度快)
- 需要 FP32 的操作:softmax、损失函数等(数值稳定)
from torch.amp import autocast
# CUDA 设备
with autocast(device_type='cuda', dtype=torch.float16):
output = model(input)
# CPU 设备(使用 bfloat16)
with autocast(device_type='cpu', dtype=torch.bfloat16):
output = model(input)
GradScaler 的作用
FP16 的数值范围有限,小梯度可能变为 0(下溢)。GradScaler 通过缩放损失来防止这种情况:
scaler = GradScaler(
device='cuda',
init_scale=2**16, # 初始缩放因子
growth_factor=2.0, # 无溢出时的增长倍数
backoff_factor=0.5, # 溢出时的衰减倍数
growth_interval=2000 # 每隔多少步尝试增大缩放因子
)
# 训练步骤
optimizer.zero_grad()
with autocast(device_type='cuda'):
loss = model(input)
# 缩放损失,反向传播
scaler.scale(loss).backward()
# 取消缩放梯度,更新参数
scaler.step(optimizer)
# 更新缩放因子
scaler.update()
BF16 vs FP16
| 特性 | FP16 | BF16 |
|---|---|---|
| 数值范围 | ~ | ~ |
| 精度 | 高 | 低 |
| 硬件支持 | 广泛 | Ampere 及更新 GPU |
| 溢出风险 | 高 | 低 |
| 需要 GradScaler | 是 | 否 |
推荐:
- 如果 GPU 支持 BF16(Ampere 及更新),优先使用 BF16
- 使用 BF16 时可以不需要 GradScaler
# 使用 BF16(不需要 GradScaler)
with autocast(device_type='cuda', dtype=torch.bfloat16):
output = model(input)
loss = criterion(output, target)
loss.backward()
optimizer.step()
混合精度训练最佳实践
import torch
import torch.nn as nn
import torch.optim as optim
from torch.amp import autocast, GradScaler
from torch.utils.data import DataLoader
def train_with_amp(model, train_loader, num_epochs):
device = torch.device('cuda')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.001)
# 使用 FP16
scaler = GradScaler('cuda')
use_amp = True
for epoch in range(num_epochs):
model.train()
for inputs, targets in train_loader:
inputs = inputs.to(device)
targets = targets.to(device)
optimizer.zero_grad()
if use_amp:
# 混合精度训练
with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
else:
# 普通 FP32 训练
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
return model
性能对比
在 NVIDIA A100 GPU 上,混合精度训练通常可以获得:
- 速度提升:1.5x - 3x
- 显存节省:约 50%
- 精度损失:通常可忽略
torch.compile 加速训练
PyTorch 2.0 引入了 torch.compile,可以在不修改代码的情况下显著加速模型训练。
基本用法
import torch
model = MyModel()
# 一行代码编译模型
model = torch.compile(model)
# 正常使用,无需修改其他代码
output = model(input)
编译模式
torch.compile 支持多种编译模式,平衡编译时间和执行效率:
# 默认模式:平衡编译时间和执行效率
model = torch.compile(model)
# 最大优化模式:编译时间更长,运行更快
model = torch.compile(model, mode='max-autotune')
# 减少内存模式:降低内存占用
model = torch.compile(model, mode='reduce-overhead')
# 默认模式(显式指定)
model = torch.compile(model, mode='default')
编译特定部分
可以只编译模型的特定部分:
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.backbone = ResNet()
self.head = nn.Linear(512, 10)
# 只编译骨干网络
self.backbone = torch.compile(self.backbone)
def forward(self, x):
x = self.backbone(x)
x = self.head(x)
return x
动态形状支持
对于输入形状变化的模型:
# 支持动态形状
model = torch.compile(model, dynamic=True)
torch.compile 与 AMP 结合
import torch
from torch.amp import autocast, GradScaler
model = torch.compile(model)
scaler = GradScaler('cuda')
for inputs, targets in train_loader:
optimizer.zero_grad()
with autocast(device_type='cuda', dtype=torch.float16):
outputs = model(inputs)
loss = criterion(outputs, targets)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
编译优化器
PyTorch 2.2+ 支持编译优化器,进一步提升性能:
import torch
model = MyModel().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 编译模型和优化器
model = torch.compile(model)
optimizer = torch.compile(optimizer)
性能优化技巧
- 首次运行较慢:编译需要时间,首次运行会较慢,后续运行会很快
- 避免频繁改变输入形状:这会触发重新编译
- 使用
mode='reduce-overhead'减少内存占用 - 对于大模型效果更明显:小模型可能看不出明显加速
高级训练技巧
梯度裁剪
梯度裁剪可以防止梯度爆炸,稳定训练过程:
import torch.nn.utils as utils
# 训练循环中
for inputs, targets in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
# 梯度裁剪:限制梯度的最大范数
max_grad_norm = 1.0
utils.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
何时使用梯度裁剪:
- RNN/LSTM 训练(梯度爆炸常见)
- 训练不稳定时
- 使用大学习率时
梯度累积
当 GPU 显存不足,无法使用大 batch size 时,可以使用梯度累积:
accumulation_steps = 4 # 累积 4 个 batch 后更新
optimizer.zero_grad()
for i, (inputs, targets) in enumerate(train_loader):
outputs = model(inputs)
loss = criterion(outputs, targets)
# 缩放损失
loss = loss / accumulation_steps
loss.backward()
# 累积够步数后更新参数
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
等效 batch size:实际 batch size = batch_size × accumulation_steps
早停法(Early Stopping)
防止过拟合,在验证损失不再改善时停止训练:
class EarlyStopping:
def __init__(self, patience=7, min_delta=0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False
def __call__(self, val_loss):
if self.best_loss is None:
self.best_loss = val_loss
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.counter = 0
# 使用早停
early_stopping = EarlyStopping(patience=10, min_delta=0.001)
for epoch in range(num_epochs):
train_loss = train_epoch(model, train_loader, criterion, optimizer)
val_loss = validate(model, val_loader, criterion)
early_stopping(val_loss)
if early_stopping.early_stop:
print(f"早停于 epoch {epoch}")
break
模型权重平均(EMA)
指数移动平均可以使模型更加稳定:
class EMA:
def __init__(self, model, decay=0.999):
self.model = model
self.decay = decay
self.shadow = {}
self.backup = {}
# 初始化影子参数
for name, param in model.named_parameters():
if param.requires_grad:
self.shadow[name] = param.data.clone()
def update(self):
"""更新影子参数"""
for name, param in self.model.named_parameters():
if param.requires_grad:
new_average = self.decay * self.shadow[name] + (1 - self.decay) * param.data
self.shadow[name] = new_average.clone()
def apply_shadow(self):
"""应用影子参数(用于评估)"""
for name, param in self.model.named_parameters():
if param.requires_grad:
self.backup[name] = param.data.clone()
param.data = self.shadow[name]
def restore(self):
"""恢复原始参数"""
for name, param in self.model.named_parameters():
if param.requires_grad:
param.data = self.backup[name]
self.backup = {}
# 使用 EMA
ema = EMA(model, decay=0.999)
for epoch in range(num_epochs):
train_epoch(model, train_loader, criterion, optimizer)
ema.update() # 每个 epoch 更新
# 验证时使用 EMA 权重
ema.apply_shadow()
val_loss = validate(model, val_loader, criterion)
ema.restore()
学习率预热(Warmup)
训练开始时使用小学习率,逐步增加到目标学习率:
def get_lr_warmup_scheduler(optimizer, warmup_epochs, target_lr):
"""简单的学习率预热"""
def lr_lambda(epoch):
if epoch < warmup_epochs:
return (epoch + 1) / warmup_epochs
return 1.0
return optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
scheduler = get_lr_warmup_scheduler(optimizer, warmup_epochs=5, target_lr=0.001)
权重初始化
良好的权重初始化可以加速收敛:
def init_weights(m):
"""权重初始化函数"""
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight) # Xavier 初始化
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
# 应用初始化
model.apply(init_weights)
常用初始化方法:
xavier_uniform_:适用于 tanh、sigmoid 激活函数kaiming_normal_:适用于 ReLU 激活函数trunc_normal_:适用于 Transformer
训练调试技巧
监控梯度
检查梯度可以帮助诊断训练问题:
def monitor_gradients(model):
"""监控模型梯度统计信息"""
for name, param in model.named_parameters():
if param.grad is not None:
grad_norm = param.grad.norm().item()
grad_max = param.grad.max().item()
grad_min = param.grad.min().item()
print(f"{name}: norm={grad_norm:.6f}, "
f"max={grad_max:.6f}, min={grad_min:.6f}")
# 在训练中调用
loss.backward()
monitor_gradients(model)
检查模型输出
def check_model_output(model, inputs):
"""检查模型输出是否正常"""
model.eval()
with torch.no_grad():
outputs = model(inputs)
print(f"输出形状: {outputs.shape}")
print(f"输出范围: [{outputs.min().item():.4f}, {outputs.max().item():.4f}]")
print(f"输出均值: {outputs.mean().item():.4f}")
print(f"输出标准差: {outputs.std().item():.4f}")
# 检查是否有 NaN 或 Inf
if torch.isnan(outputs).any():
print("警告:输出包含 NaN!")
if torch.isinf(outputs).any():
print("警告:输出包含 Inf!")
过拟合单个 batch
在开始完整训练前,先验证模型能否过拟合单个 batch:
# 获取单个 batch
single_batch = next(iter(train_loader))
inputs, targets = single_batch
# 训练 100 个 epoch,看损失是否降到接近 0
for epoch in range(100):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
# 如果损失降到接近 0,说明模型有学习能力
# 如果损失不下降,说明模型或数据有问题
学习率范围测试
找到合适的学习率范围:
def find_lr(model, train_loader, criterion, optimizer,
init_value=1e-8, final_value=10, num_iter=100):
"""学习率范围测试"""
model.train()
# 设置初始学习率
for param_group in optimizer.param_groups:
param_group['lr'] = init_value
# 计算学习率增长因子
mult = (final_value / init_value) ** (1 / num_iter)
losses = []
lrs = []
best_loss = float('inf')
iterator = iter(train_loader)
for i in range(num_iter):
try:
inputs, targets = next(iterator)
except StopIteration:
iterator = iter(train_loader)
inputs, targets = next(iterator)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
losses.append(loss.item())
lrs.append(optimizer.param_groups[0]['lr'])
# 更新学习率
for param_group in optimizer.param_groups:
param_group['lr'] *= mult
# 如果损失爆炸,提前停止
if loss.item() > best_loss * 10:
break
best_loss = min(best_loss, loss.item())
return lrs, losses
# 使用
lrs, losses = find_lr(model, train_loader, criterion, optimizer)
# 绘制学习率-损失曲线,找到损失快速下降的学习率
常见问题排查
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 损失是 NaN | 学习率过大、梯度爆炸 | 降低学习率、梯度裁剪 |
| 损失不下降 | 学习率过小、模型问题 | 增大学习率、检查模型 |
| 训练震荡 | 学习率过大 | 降低学习率、使用动量 |
| 过拟合 | 模型太复杂、数据太少 | 正则化、数据增强、早停 |
| 欠拟合 | 模型太简单、训练不够 | 增加模型容量、延长训练 |
断点续训
训练过程可能因各种原因中断,需要支持断点续训:
import os
def save_checkpoint(model, optimizer, scheduler, epoch, loss, path):
"""保存检查点"""
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict() if scheduler else None,
'loss': loss,
}, path)
def load_checkpoint(model, optimizer, scheduler, path):
"""加载检查点"""
if os.path.exists(path):
checkpoint = torch.load(path)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
if scheduler and checkpoint['scheduler_state_dict']:
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
return checkpoint['epoch'], checkpoint['loss']
return 0, float('inf')
# 使用示例
checkpoint_path = 'checkpoint.pth'
start_epoch, best_loss = load_checkpoint(model, optimizer, scheduler, checkpoint_path)
for epoch in range(start_epoch, num_epochs):
train_loss = train_epoch(model, train_loader, criterion, optimizer)
val_loss = validate(model, val_loader, criterion)
scheduler.step()
# 定期保存检查点
if (epoch + 1) % 5 == 0:
save_checkpoint(model, optimizer, scheduler, epoch + 1, val_loss, checkpoint_path)
# 保存最佳模型
if val_loss < best_loss:
best_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')
小结
本章我们深入学习了 PyTorch 模型训练的完整流程:
- 损失函数:回归任务使用 MSE/L1/SmoothL1,分类任务使用 CrossEntropy/BCE
- 优化器:SGD 适合追求泛化,Adam 适合快速收敛,AdamW 适合 Transformer
- 学习率调度:余弦退火效果好,OneCycleLR 自动化程度高
- 训练循环:理解 zero_grad、backward、step 的作用
- 混合精度训练:使用 torch.amp 加速训练、节省显存
- torch.compile:一行代码加速模型
- 高级技巧:梯度裁剪、梯度累积、早停、EMA
- 调试方法:监控梯度、过拟合单 batch、学习率范围测试
练习
- 实现一个完整的分类任务训练流程,包含训练、验证、测试
- 尝试不同的优化器(SGD、Adam、AdamW),比较收敛速度和最终效果
- 使用混合精度训练,对比 FP32 训练的速度和显存占用
- 实现早停法和学习率预热
- 编写一个完整的断点续训功能