神经网络 (Neural Network)
本章将介绍如何使用 PyTorch 的 nn.Module 构建神经网络,这是定义深度学习模型的标准方式。我们将从基础的层定义开始,逐步深入到自定义层、自定义损失函数、注意力机制等高级主题。
nn.Module 基础
torch.nn.Module 是 PyTorch 中所有神经网络的基类。它提供了一种组织模型结构、管理参数和处理输入输出的标准方式。理解 nn.Module 的工作原理是构建复杂模型的基础。
创建一个简单的神经网络
定义神经网络需要继承 nn.Module 类,并实现两个核心方法:__init__ 用于初始化层和参数,forward 用于定义前向传播逻辑。
import torch
import torch.nn as nn
# 定义神经网络
class SimpleNet(nn.Module):
def __init__(self):
# 调用父类的构造函数(必须)
super(SimpleNet, self).__init__()
# 定义网络层
self.fc1 = nn.Linear(784, 256) # 全连接层:输入784,输出256
self.relu = nn.ReLU() # 激活函数
self.fc2 = nn.Linear(256, 10) # 输出层:10个类别
def forward(self, x):
# 定义前向传播
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x
# 创建模型实例
model = SimpleNet()
print(model)
# 测试模型
x = torch.randn(1, 784) # 模拟一个样本(28x28图像展平)
output = model(x)
print(f"输出形状: {output.shape}") # torch.Size([1, 10])
nn.Module 的核心特性
nn.Module 提供了以下核心功能,理解这些特性对于正确使用 PyTorch 至关重要:
参数自动追踪:所有通过 nn.Parameter 定义的参数或作为属性的 nn.Module 子模块都会被自动追踪。
# 查看模型参数
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")
# 查看可训练参数总数
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
设备管理:模型可以轻松地在 CPU 和 GPU 之间移动。
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
训练/评估模式:通过 train() 和 eval() 方法切换模式,影响 Dropout、BatchNorm 等层的行为。
model.train() # 训练模式:Dropout 生效,BatchNorm 使用 batch 统计
model.eval() # 评估模式:Dropout 关闭,BatchNorm 使用 running 统计
模块的层次结构
nn.Module 支持嵌套,可以构建复杂的层次结构。理解 modules() 和 children() 的区别很重要:
import torch.nn as nn
class Block(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.linear = nn.Linear(in_features, out_features)
self.norm = nn.BatchNorm1d(out_features)
self.activation = nn.ReLU()
def forward(self, x):
return self.activation(self.norm(self.linear(x)))
class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.block1 = Block(784, 256)
self.block2 = Block(256, 64)
self.head = nn.Linear(64, 10)
def forward(self, x):
x = self.block1(x)
x = self.block2(x)
return self.head(x)
model = MyModel()
# children() 只返回直接子模块
print("直接子模块 (children):")
for name, child in model.named_children():
print(f" {name}: {child.__class__.__name__}")
# modules() 返回所有模块(包括自身和嵌套模块)
print("\n所有模块 (modules):")
for module in model.modules():
print(f" {module.__class__.__name__}")
常用网络层
全连接层 (Linear)
全连接层(也称为线性层或密集层)是神经网络中最基本的层,执行 的仿射变换。
import torch.nn as nn
# nn.Linear(in_features, out_features, bias=True)
linear = nn.Linear(100, 50)
# 输入: (batch_size, in_features)
# 输出: (batch_size, out_features)
x = torch.randn(32, 100) # batch_size=32, 100个特征
y = linear(x)
print(f"输出形状: {y.shape}") # torch.Size([32, 50])
# 访问权重和偏置
print(f"权重形状: {linear.weight.shape}") # [50, 100]
print(f"偏置形状: {linear.bias.shape}") # [50]
参数解释:
in_features:输入特征维度,每个输入样本的大小out_features:输出特征维度,每个输出样本的大小bias:是否添加偏置(默认为 True)
激活函数层
激活函数为神经网络引入非线性,使其能够学习复杂的模式。没有激活函数,多层神经网络等价于单层线性变换。
import torch
import torch.nn as nn
# ReLU (Rectified Linear Unit) - 最常用
# 公式: f(x) = max(0, x)
relu = nn.ReLU()
print(relu(torch.tensor([-1.0, 0.0, 1.0]))) # tensor([0., 0., 1.])
# ReLU6 - 将输出限制在 [0, 6] 区间
relu6 = nn.ReLU6()
# LeakyReLU - 解决 ReLU 的"死亡神经元"问题
# 负数部分不完全置零,而是乘以一个小斜率
leaky_relu = nn.LeakyReLU(negative_slope=0.01)
print(leaky_relu(torch.tensor([-1.0, 0.0, 1.0]))) # tensor([-0.01, 0., 1.])
# PReLU - 可学习的 LeakyReLU
prelu = nn.PReLU() # 斜率是可学习参数
# ELU - 指数线性单元
elu = nn.ELU(alpha=1.0)
# GELU - 高斯误差线性单元,Transformer 中常用
gelu = nn.GELU()
# SELU - 自归一化激活函数
selu = nn.SELU()
# Sigmoid - 输出在 (0, 1) 区间,常用于二分类
# 公式: f(x) = 1 / (1 + exp(-x))
sigmoid = nn.Sigmoid()
# Tanh - 输出在 (-1, 1) 区间
# 公式: f(x) = (exp(x) - exp(-x)) / (exp(x) + exp(-x))
tanh = nn.Tanh()
# Softmax - 多分类输出(沿指定维度)
# 输出和为 1,表示概率分布
softmax = nn.Softmax(dim=1)
# LogSoftmax - Softmax 的对数形式(配合 NLLLoss 使用)
# 数值更稳定
log_softmax = nn.LogSoftmax(dim=1)
# Swish/SiLU - 自门控激活函数,效果优于 ReLU
silu = nn.SiLU() # 也称为 Swish
激活函数选择建议:
- 隐藏层:ReLU 是最常用的选择;对于深层网络,考虑 GELU 或 Swish
- 输出层:二分类用 Sigmoid,多分类用 Softmax(或通过 CrossEntropyLoss 内置)
- RNN/LSTM:Tanh 通常表现更好
Dropout 层
Dropout 是一种正则化技术,在训练时随机将部分神经元输出置零,防止神经元过度依赖特定特征,从而减少过拟合。
import torch.nn as nn
# Dropout 层
dropout = nn.Dropout(p=0.5) # p 是丢弃概率
x = torch.randn(1, 10)
y = dropout(x)
print(f"Dropout 前: {x}")
print(f"Dropout 后: {y}") # 约一半元素变为 0,其余元素放大 2 倍
# Alpha Dropout - 保持自归一化属性
alpha_dropout = nn.AlphaDropout(p=0.5)
# Dropout2d - 用于卷积网络,丢弃整个通道
dropout2d = nn.Dropout2d(p=0.5)
# Dropout3d - 用于 3D 卷积
dropout3d = nn.Dropout3d(p=0.5)
# 训练时 Dropout 生效,推理时不生效
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(20, 5)
)
model.train() # 训练模式:Dropout 生效
model.eval() # 评估模式:Dropout 关闭
注意:Dropout 在训练时会对保留的神经元输出进行缩放(乘以 ),以保持期望值不变。在评估模式下,Dropout 层直接传递输入。
BatchNorm 层
BatchNorm(批归一化)通过对每批数据的每个特征进行归一化,使均值为 0、方差为 1,然后通过可学习的缩放和偏移参数恢复表达能力。这可以加速训练、允许使用更大的学习率,并有一定的正则化效果。
import torch.nn as nn
# 1D BatchNorm(常用于全连接网络)
# 输入形状: (N, C) 或 (N, C, L)
bn1d = nn.BatchNorm1d(num_features=100)
# 2D BatchNorm(常用于卷积网络)
# 输入形状: (N, C, H, W)
bn2d = nn.BatchNorm2d(num_features=64)
# 3D BatchNorm(常用于3D卷积)
# 输入形状: (N, C, D, H, W)
bn3d = nn.BatchNorm3d(num_features=64)
# 使用示例
x = torch.randn(32, 100) # batch=32, features=100
y = bn1d(x)
print(f"BatchNorm 后均值: {y.mean(dim=0).abs().max().item():.6f}") # 接近0
print(f"BatchNorm 后标准差: {y.std(dim=0).mean().item():.6f}") # 接近1
# BatchNorm 有两个可学习参数
print(f"缩放参数 gamma: {bn1d.weight.shape}") # [100]
print(f"偏移参数 beta: {bn1d.bias.shape}") # [100]
# 以及两个运行时统计量(非参数)
print(f"运行均值: {bn1d.running_mean.shape}") # [100]
print(f"运行方差: {bn1d.running_var.shape}") # [100]
训练与推理的区别:
- 训练时:使用当前 batch 的均值和方差进行归一化,并更新运行均值和方差
- 推理时:使用训练过程中累积的运行均值和方差
LayerNorm vs BatchNorm:
LayerNorm(层归一化)是另一种归一化方式,在每个样本的所有特征上进行归一化,而不是在 batch 维度上:
import torch.nn as nn
# LayerNorm - 对每个样本的所有特征归一化
# Transformer 中广泛使用
ln = nn.LayerNorm(normalized_shape=768) # 或 [768] 或 [768, 10]
x = torch.randn(32, 10, 768) # batch=32, seq_len=10, hidden=768
y = ln(x)
LayerNorm 的优势是不依赖 batch size,适合小 batch 或变长序列场景。
使用 nn.Sequential 快速构建
对于简单的顺序网络,可以使用 nn.Sequential 快速构建,无需定义 forward 方法:
import torch.nn as nn
# 方式1:直接传入层
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)
# 方式2:使用 OrderedDict(给层命名)
from collections import OrderedDict
model = nn.Sequential(OrderedDict([
('fc1', nn.Linear(784, 256)),
('relu1', nn.ReLU()),
('dropout', nn.Dropout(0.2)),
('fc2', nn.Linear(256, 10))
]))
# 访问特定层
print(model.fc1)
print(model[0]) # 按索引访问
# 添加层
model.add_module('output', nn.Softmax(dim=1))
自定义层
当内置层无法满足需求时,可以通过继承 nn.Module 创建自定义层。自定义层需要正确管理参数和实现前向传播。
无参数的自定义层
如果层不需要可学习参数,实现非常简单:
import torch
import torch.nn as nn
class CenteredLayer(nn.Module):
"""将输入中心化(减去均值)"""
def __init__(self):
super().__init__()
def forward(self, X):
return X - X.mean()
# 使用
layer = CenteredLayer()
x = torch.FloatTensor([1, 2, 3, 4, 5])
print(layer(x)) # tensor([-2., -1., 0., 1., 2.])
带参数的自定义层
使用 nn.Parameter 定义可学习参数,这些参数会被自动追踪:
import torch
import torch.nn as nn
class MyLinear(nn.Module):
"""自定义全连接层"""
def __init__(self, in_features, out_features):
super().__init__()
self.in_features = in_features
self.out_features = out_features
# 定义可学习参数
self.weight = nn.Parameter(torch.randn(out_features, in_features))
self.bias = nn.Parameter(torch.randn(out_features))
def forward(self, x):
# 执行线性变换
return torch.matmul(x, self.weight.t()) + self.bias
def extra_repr(self):
"""自定义打印信息"""
return f'in_features={self.in_features}, out_features={self.out_features}'
# 使用自定义层
linear = MyLinear(10, 5)
print(linear) # 会显示 extra_repr 的内容
x = torch.randn(2, 10)
y = linear(x)
print(f"输出形状: {y.shape}")
# 参数自动被追踪
print(f"参数数量: {sum(p.numel() for p in linear.parameters())}")
带缓冲区(Buffer)的自定义层
除了可训练参数,有时还需要存储不可训练的状态(如 BatchNorm 的运行统计量)。使用 register_buffer 注册缓冲区:
import torch
import torch.nn as nn
class RunningNorm(nn.Module):
"""运行时归一化(类似于 BatchNorm 但不学习)"""
def __init__(self, num_features, momentum=0.1):
super().__init__()
self.momentum = momentum
# 注册缓冲区(不参与梯度计算,但会随模型移动设备)
self.register_buffer('running_mean', torch.zeros(num_features))
self.register_buffer('running_var', torch.ones(num_features))
def forward(self, x):
if self.training:
# 训练时更新运行统计量
mean = x.mean(dim=0)
var = x.var(dim=0, unbiased=False)
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
return (x - mean) / torch.sqrt(var + 1e-5)
else:
# 推理时使用运行统计量
return (x - self.running_mean) / torch.sqrt(self.running_var + 1e-5)
# 使用
norm = RunningNorm(10)
norm.train()
x = torch.randn(32, 10)
y = norm(x)
# 缓冲区不在 parameters() 中,但在 state_dict() 中
print(f"参数: {list(norm.parameters())}") # 空
print(f"缓冲区: {list(norm.buffers())}") # [running_mean, running_var]
参数 vs 缓冲区:
nn.Parameter:可训练参数,会计算梯度,出现在model.parameters()中register_buffer:不可训练状态,不计算梯度,出现在model.buffers()中,但会随模型移动设备
复合自定义层
自定义层可以组合其他层,实现复杂的逻辑:
import torch
import torch.nn as nn
import torch.nn.functional as F
class SqueezeExcitation(nn.Module):
"""Squeeze-and-Excitation 模块"""
def __init__(self, channels, reduction=16):
super().__init__()
self.squeeze = nn.AdaptiveAvgPool2d(1)
self.excitation = nn.Sequential(
nn.Linear(channels, channels // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(channels // reduction, channels, bias=False),
nn.Sigmoid()
)
def forward(self, x):
b, c, _, _ = x.size()
# Squeeze: 全局平均池化
y = self.squeeze(x).view(b, c)
# Excitation: 学习通道权重
y = self.excitation(y).view(b, c, 1, 1)
# Scale: 重新加权
return x * y.expand_as(x)
# 使用
se = SqueezeExcitation(channels=64)
x = torch.randn(8, 64, 32, 32)
y = se(x)
print(f"输出形状: {y.shape}")
自定义损失函数
除了内置的损失函数(如 nn.CrossEntropyLoss、nn.MSELoss),有时需要自定义损失函数来满足特定需求。
作为函数的损失
最简单的方式是定义一个函数:
import torch
import torch.nn.functional as F
def dice_loss(pred, target, smooth=1.0):
"""
Dice 损失,常用于图像分割
Dice = 2|A∩B| / (|A|+|B|)
"""
pred = torch.sigmoid(pred)
pred_flat = pred.view(-1)
target_flat = target.view(-1)
intersection = (pred_flat * target_flat).sum()
dice = (2.0 * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)
return 1.0 - dice
# 使用
pred = torch.randn(4, 1, 32, 32, requires_grad=True)
target = torch.randint(0, 2, (4, 1, 32, 32)).float()
loss = dice_loss(pred, target)
loss.backward()
作为模块的损失
如果损失函数需要可学习参数,可以定义为 nn.Module:
import torch
import torch.nn as nn
class FocalLoss(nn.Module):
"""
Focal Loss - 解决类别不平衡问题
FL(p_t) = -α_t * (1 - p_t)^γ * log(p_t)
γ: focusing 参数,增加对难分类样本的关注
α: 类别权重
"""
def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
# inputs: [N, C] 未归一化的 logits
# targets: [N] 类别索引
ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss) # 预测正确类别的概率
# 计算类别权重
at = self.alpha * targets + (1 - self.alpha) * (1 - targets)
# Focal Loss
focal_loss = at * (1 - pt) ** self.gamma * ce_loss
if self.reduction == 'mean':
return focal_loss.mean()
elif self.reduction == 'sum':
return focal_loss.sum()
else:
return focal_loss
# 使用
criterion = FocalLoss(alpha=0.25, gamma=2.0)
pred = torch.randn(8, 10)
target = torch.randint(0, 10, (8,))
loss = criterion(pred, target)
print(f"Focal Loss: {loss.item()}")
组合多个损失
在多任务学习中,经常需要组合多个损失函数:
import torch
import torch.nn as nn
class MultiTaskLoss(nn.Module):
"""多任务损失"""
def __init__(self, num_tasks, weights=None):
super().__init__()
if weights is None:
weights = torch.ones(num_tasks)
self.register_buffer('weights', weights)
self.task_losses = nn.ModuleList([
nn.CrossEntropyLoss() for _ in range(num_tasks)
])
def forward(self, predictions, targets):
"""
predictions: list of tensors, 每个任务的预测
targets: list of tensors, 每个任务的目标
"""
total_loss = 0.0
for i, (pred, target, loss_fn) in enumerate(zip(predictions, targets, self.task_losses)):
total_loss += self.weights[i] * loss_fn(pred, target)
return total_loss
# 使用
criterion = MultiTaskLoss(num_tasks=2, weights=[1.0, 0.5])
pred1 = torch.randn(8, 10)
pred2 = torch.randn(8, 5)
target1 = torch.randint(0, 10, (8,))
target2 = torch.randint(0, 5, (8,))
loss = criterion([pred1, pred2], [target1, target2])
print(f"多任务损失: {loss.item()}")
注意力机制
注意力机制是现代深度学习的核心技术,尤其在 Transformer 架构中广泛应用。PyTorch 提供了 nn.MultiheadAttention 模块,同时也支持手动实现。
使用 nn.MultiheadAttention
PyTorch 内置的 nn.MultiheadAttention 实现了标准的多头注意力机制:
import torch
import torch.nn as nn
# 创建多头注意力层
embed_dim = 512 # 嵌入维度
num_heads = 8 # 注意力头数
mha = nn.MultiheadAttention(
embed_dim=embed_dim,
num_heads=num_heads,
dropout=0.1,
bias=True,
batch_first=True # 输入形状为 (batch, seq, feature)
)
# 输入
batch_size = 4
seq_len = 10
query = torch.randn(batch_size, seq_len, embed_dim)
key = torch.randn(batch_size, seq_len, embed_dim)
value = torch.randn(batch_size, seq_len, embed_dim)
# 计算注意力
attn_output, attn_weights = mha(query, key, value)
print(f"输出形状: {attn_output.shape}") # [4, 10, 512]
print(f"注意力权重形状: {attn_weights.shape}") # [4, 10, 10]
# 自注意力(query = key = value)
self_attn_output, _ = mha(query, query, query)
参数说明:
embed_dim:总嵌入维度,会被num_heads平分num_heads:并行注意力头数,每个头的维度为embed_dim // num_headsdropout:注意力权重的 dropout 概率batch_first:输入形状为(batch, seq, feature)还是(seq, batch, feature)
因果注意力(自回归)
在语言模型等自回归场景中,需要使用因果掩码防止看到未来信息:
import torch
import torch.nn as nn
embed_dim = 256
num_heads = 4
seq_len = 10
mha = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
# 创建因果掩码
# True 表示不允许 attend
causal_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
print("因果掩码:")
print(causal_mask.int())
# [[0, 1, 1, 1, ...],
# [0, 0, 1, 1, ...],
# [0, 0, 0, 1, ...],
# ...]
query = torch.randn(2, seq_len, embed_dim)
# 使用 is_causal 参数(PyTorch 2.0+)
output, _ = mha(query, query, query, is_causal=True)
# 或使用 attn_mask
# output, _ = mha(query, query, query, attn_mask=causal_mask)
从零实现多头注意力
理解注意力机制的内部实现:
import torch
import torch.nn as nn
import math
class MultiHeadAttention(nn.Module):
"""手动实现的多头注意力"""
def __init__(self, embed_dim, num_heads, dropout=0.0):
super().__init__()
assert embed_dim % num_heads == 0, "embed_dim 必须能被 num_heads 整除"
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
# 线性投影层
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, attn_mask=None):
"""
query, key, value: [batch, seq_len, embed_dim]
attn_mask: [seq_len, seq_len] 或 [batch, seq_len, seq_len]
"""
batch_size, seq_len, _ = query.shape
# 线性投影
Q = self.q_proj(query) # [batch, seq, embed]
K = self.k_proj(key)
V = self.v_proj(value)
# 重塑为多头: [batch, num_heads, seq, head_dim]
Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
# 计算注意力分数
# [batch, heads, seq, head_dim] @ [batch, heads, head_dim, seq] -> [batch, heads, seq, seq]
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
# 应用掩码
if attn_mask is not None:
scores = scores.masked_fill(attn_mask == True, float('-inf'))
# Softmax
attn_weights = torch.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)
# 加权求和
# [batch, heads, seq, seq] @ [batch, heads, seq, head_dim] -> [batch, heads, seq, head_dim]
attn_output = torch.matmul(attn_weights, V)
# 合并多头: [batch, seq, embed]
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)
# 输出投影
output = self.out_proj(attn_output)
return output, attn_weights
# 使用自定义实现
mha = MultiHeadAttention(embed_dim=512, num_heads=8)
query = torch.randn(2, 10, 512)
output, weights = mha(query, query, query)
print(f"输出形状: {output.shape}")
print(f"权重形状: {weights.shape}")
Transformer 编码器块
结合注意力机制和前馈网络构建完整的 Transformer 编码器块:
import torch
import torch.nn as nn
class TransformerEncoderBlock(nn.Module):
"""Transformer 编码器块"""
def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super().__init__()
# 多头自注意力
self.self_attn = nn.MultiheadAttention(
embed_dim, num_heads, dropout=dropout, batch_first=True
)
# 前馈网络
self.ffn = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(ff_dim, embed_dim),
nn.Dropout(dropout)
)
# 层归一化
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x, attn_mask=None):
# 自注意力 + 残差连接 + LayerNorm (Pre-LN)
attn_output, _ = self.self_attn(x, x, x, attn_mask=attn_mask)
x = self.norm1(x + self.dropout(attn_output))
# 前馈网络 + 残差连接 + LayerNorm
ffn_output = self.ffn(x)
x = self.norm2(x + ffn_output)
return x
# 使用
block = TransformerEncoderBlock(embed_dim=512, num_heads=8, ff_dim=2048)
x = torch.randn(2, 10, 512)
output = block(x)
print(f"输出形状: {output.shape}")
模型钩子(Hooks)
钩子(Hooks)允许你在前向传播或反向传播过程中拦截和修改数据,这对于可视化、调试和实现复杂功能非常有用。
前向钩子
前向钩子可以在层的前向传播前后执行自定义操作:
import torch
import torch.nn as nn
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 5)
)
# 定义前向钩子
def forward_hook(module, input, output):
"""在前向传播后调用"""
print(f"{module.__class__.__name__} 前向传播:")
print(f" 输入形状: {input[0].shape}")
print(f" 输出形状: {output.shape}")
# 注册钩子
hook_handle = model[0].register_forward_hook(forward_hook)
# 前向传播
x = torch.randn(2, 10)
y = model(x)
# 移除钩子
hook_handle.remove()
反向钩子
反向钩子可以在反向传播过程中访问梯度:
import torch
import torch.nn as nn
model = nn.Linear(10, 5)
# 定义反向钩子
def backward_hook(module, grad_input, grad_output):
"""在反向传播时调用"""
print(f"{module.__class__.__name__} 反向传播:")
print(f" 输入梯度: {[g.shape for g in grad_input if g is not None]}")
print(f" 输出梯度: {[g.shape for g in grad_output if g is not None]}")
# 注册钩子
hook_handle = model.register_backward_hook(backward_hook)
# 前向传播
x = torch.randn(2, 10)
y = model(x)
loss = y.sum()
# 反向传播
loss.backward()
# 移除钩子
hook_handle.remove()
实用示例:特征提取
使用钩子提取中间层特征:
import torch
import torch.nn as nn
class FeatureExtractor:
"""提取模型中间层特征"""
def __init__(self, model, layer_names):
self.model = model
self.features = {}
self.handles = []
for name, layer in model.named_modules():
if name in layer_names:
handle = layer.register_forward_hook(self._hook(name))
self.handles.append(handle)
def _hook(self, name):
def hook(module, input, output):
self.features[name] = output.detach()
return hook
def __call__(self, x):
self.features = {}
self.model(x)
return self.features
def remove(self):
for handle in self.handles:
handle.remove()
# 使用
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 5)
)
extractor = FeatureExtractor(model, ['0', '1'])
x = torch.randn(2, 10)
features = extractor(x)
for name, feature in features.items():
print(f"{name}: {feature.shape}")
# 清理
extractor.remove()
构建复杂网络
多层感知机 (MLP)
import torch
import torch.nn as nn
class MLP(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size, dropout=0.2):
super(MLP, self).__init__()
# 构建多层网络
layers = []
prev_size = input_size
for hidden_size in hidden_sizes:
layers.append(nn.Linear(prev_size, hidden_size))
layers.append(nn.BatchNorm1d(hidden_size))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_size = hidden_size
# 输出层(通常不加激活,让损失函数处理)
layers.append(nn.Linear(prev_size, output_size))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# 创建模型
model = MLP(input_size=784, hidden_sizes=[512, 256, 128], output_size=10)
print(model)
# 测试
x = torch.randn(64, 784)
output = model(x)
print(f"输出形状: {output.shape}") # torch.Size([64, 10])
带残差连接的深度网络
残差连接(Residual Connection)通过跳跃连接帮助梯度流动,使得训练更深的网络成为可能:
import torch.nn as nn
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(ResidualBlock, self).__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels,
kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels,
kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
# 残差连接(维度不匹配时需要投影)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
def forward(self, x):
residual = self.shortcut(x)
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out += residual # 残差连接
out = self.relu(out)
return out
class ResNetLike(nn.Module):
def __init__(self, num_classes=10):
super(ResNetLike, self).__init__()
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.layer1 = self._make_layer(64, 64, 2, stride=1)
self.layer2 = self._make_layer(64, 128, 2, stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(128, num_classes)
def _make_layer(self, in_channels, out_channels, num_blocks, stride):
layers = []
layers.append(ResidualBlock(in_channels, out_channels, stride))
for _ in range(1, num_blocks):
layers.append(ResidualBlock(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
前向传播与推理
基本推理流程
import torch
import torch.nn as nn
# 创建模型
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
)
# 切换到评估模式(重要!影响 Dropout、BatchNorm 等层的行为)
model.eval()
# 推理
x = torch.randn(1, 784)
# 方式1:直接调用
with torch.no_grad(): # 禁用梯度计算,节省内存
output = model(x)
# 方式2:使用 forward(等效)
with torch.no_grad():
output = model.forward(x)
# 获取预测类别
pred = output.argmax(dim=1)
print(f"预测类别: {pred.item()}")
GPU 推理
# 检查 GPU 是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 移动模型到 GPU
model = model.to(device)
# 移动输入到 GPU
x = x.to(device)
# 推理
with torch.no_grad():
output = model(x)
模型参数管理
访问参数
# 方式1:直接访问
for param in model.parameters():
print(f"参数形状: {param.shape}")
# 方式2:命名参数
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")
# 初始化参数
def initialize_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
model.apply(initialize_weights)
常见初始化方法
import torch.nn.init as init
# Xavier 初始化(适合 tanh、sigmoid 激活)
init.xavier_uniform_(layer.weight)
init.xavier_normal_(layer.weight)
# Kaiming 初始化(适合 ReLU/LeakyReLU)
init.kaiming_uniform_(layer.weight, nonlinearity='relu')
init.kaiming_normal_(layer.weight, nonlinearity='relu')
# 常数初始化
init.zeros_(layer.weight)
init.ones_(layer.weight)
init.constant_(layer.weight, 0.5)
# 正态分布初始化
init.normal_(layer.weight, mean=0, std=0.01)
# 截断正态分布(常用于 Transformer)
init.trunc_normal_(layer.weight, mean=0, std=0.02, a=-2, b=2)
参数共享
有时需要在多个层之间共享参数:
import torch
import torch.nn as nn
class SharedWeights(nn.Module):
"""共享权重的模型"""
def __init__(self):
super().__init__()
# 创建一个共享的线性层
self.shared_linear = nn.Linear(10, 10)
# 在多个地方使用同一个层
self.layer1 = self.shared_linear
self.layer2 = self.shared_linear # 共享权重
def forward(self, x):
x = self.layer1(x)
x = torch.relu(x)
x = self.layer2(x) # 使用相同的权重
return x
# 使用
model = SharedWeights()
x = torch.randn(2, 10)
y = model(x)
print(f"输出形状: {y.shape}")
# 验证参数共享
print(f"参数数量: {sum(p.numel() for p in model.parameters())}") # 只有 110 个参数
完整示例:MNIST 分类器
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# 定义模型
class MNISTClassifier(nn.Module):
def __init__(self):
super(MNISTClassifier, self).__init__()
self.network = nn.Sequential(
nn.Linear(784, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)
def forward(self, x):
x = x.view(-1, 784) # 展平
return self.network(x)
# 创建模型
model = MNISTClassifier()
# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 模拟数据
x_train = torch.randn(1000, 1, 28, 28)
y_train = torch.randint(0, 10, (1000,))
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# 训练
model.train()
for epoch in range(5):
total_loss = 0
for batch_x, batch_y in train_loader:
# 前向传播
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")
# 评估
model.eval()
with torch.no_grad():
x_test = torch.randn(10, 1, 28, 28)
outputs = model(x_test)
_, predicted = torch.max(outputs, 1)
print(f"预测结果: {predicted}")
小结
本章我们系统学习了 PyTorch 神经网络模块的核心知识:
- nn.Module 基础:理解模块的层次结构、参数管理和设备处理
- 常用网络层:全连接层、激活函数、Dropout、BatchNorm 等
- 自定义层:使用
nn.Parameter和register_buffer创建可复用的组件 - 自定义损失函数:实现 Dice Loss、Focal Loss 等专业损失
- 注意力机制:理解并实现多头注意力,构建 Transformer 块
- 模型钩子:在前向和反向传播中拦截数据,用于调试和特征提取
- 参数管理:初始化、共享和管理模型参数
参考资源
神经网络模型需要数据来训练。下一章我们将学习如何使用 PyTorch 的 Dataset 和 DataLoader 高效加载和预处理数据。