跳到主要内容

神经网络 (Neural Network)

本章将介绍如何使用 PyTorch 的 nn.Module 构建神经网络,这是定义深度学习模型的标准方式。我们将从基础的层定义开始,逐步深入到自定义层、自定义损失函数、注意力机制等高级主题。

nn.Module 基础

torch.nn.Module 是 PyTorch 中所有神经网络的基类。它提供了一种组织模型结构、管理参数和处理输入输出的标准方式。理解 nn.Module 的工作原理是构建复杂模型的基础。

创建一个简单的神经网络

定义神经网络需要继承 nn.Module 类,并实现两个核心方法:__init__ 用于初始化层和参数,forward 用于定义前向传播逻辑。

import torch
import torch.nn as nn

# 定义神经网络
class SimpleNet(nn.Module):
def __init__(self):
# 调用父类的构造函数(必须)
super(SimpleNet, self).__init__()

# 定义网络层
self.fc1 = nn.Linear(784, 256) # 全连接层:输入784,输出256
self.relu = nn.ReLU() # 激活函数
self.fc2 = nn.Linear(256, 10) # 输出层:10个类别

def forward(self, x):
# 定义前向传播
x = self.fc1(x)
x = self.relu(x)
x = self.fc2(x)
return x

# 创建模型实例
model = SimpleNet()
print(model)

# 测试模型
x = torch.randn(1, 784) # 模拟一个样本(28x28图像展平)
output = model(x)
print(f"输出形状: {output.shape}") # torch.Size([1, 10])

nn.Module 的核心特性

nn.Module 提供了以下核心功能,理解这些特性对于正确使用 PyTorch 至关重要:

参数自动追踪:所有通过 nn.Parameter 定义的参数或作为属性的 nn.Module 子模块都会被自动追踪。

# 查看模型参数
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")

# 查看可训练参数总数
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")

设备管理:模型可以轻松地在 CPU 和 GPU 之间移动。

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

训练/评估模式:通过 train()eval() 方法切换模式,影响 Dropout、BatchNorm 等层的行为。

model.train()   # 训练模式:Dropout 生效,BatchNorm 使用 batch 统计
model.eval() # 评估模式:Dropout 关闭,BatchNorm 使用 running 统计

模块的层次结构

nn.Module 支持嵌套,可以构建复杂的层次结构。理解 modules()children() 的区别很重要:

import torch.nn as nn

class Block(nn.Module):
def __init__(self, in_features, out_features):
super().__init__()
self.linear = nn.Linear(in_features, out_features)
self.norm = nn.BatchNorm1d(out_features)
self.activation = nn.ReLU()

def forward(self, x):
return self.activation(self.norm(self.linear(x)))

class MyModel(nn.Module):
def __init__(self):
super().__init__()
self.block1 = Block(784, 256)
self.block2 = Block(256, 64)
self.head = nn.Linear(64, 10)

def forward(self, x):
x = self.block1(x)
x = self.block2(x)
return self.head(x)

model = MyModel()

# children() 只返回直接子模块
print("直接子模块 (children):")
for name, child in model.named_children():
print(f" {name}: {child.__class__.__name__}")

# modules() 返回所有模块(包括自身和嵌套模块)
print("\n所有模块 (modules):")
for module in model.modules():
print(f" {module.__class__.__name__}")

常用网络层

全连接层 (Linear)

全连接层(也称为线性层或密集层)是神经网络中最基本的层,执行 y=xAT+by = xA^T + b 的仿射变换。

import torch.nn as nn

# nn.Linear(in_features, out_features, bias=True)
linear = nn.Linear(100, 50)

# 输入: (batch_size, in_features)
# 输出: (batch_size, out_features)

x = torch.randn(32, 100) # batch_size=32, 100个特征
y = linear(x)
print(f"输出形状: {y.shape}") # torch.Size([32, 50])

# 访问权重和偏置
print(f"权重形状: {linear.weight.shape}") # [50, 100]
print(f"偏置形状: {linear.bias.shape}") # [50]

参数解释

  • in_features:输入特征维度,每个输入样本的大小
  • out_features:输出特征维度,每个输出样本的大小
  • bias:是否添加偏置(默认为 True)

激活函数层

激活函数为神经网络引入非线性,使其能够学习复杂的模式。没有激活函数,多层神经网络等价于单层线性变换。

import torch
import torch.nn as nn

# ReLU (Rectified Linear Unit) - 最常用
# 公式: f(x) = max(0, x)
relu = nn.ReLU()
print(relu(torch.tensor([-1.0, 0.0, 1.0]))) # tensor([0., 0., 1.])

# ReLU6 - 将输出限制在 [0, 6] 区间
relu6 = nn.ReLU6()

# LeakyReLU - 解决 ReLU 的"死亡神经元"问题
# 负数部分不完全置零,而是乘以一个小斜率
leaky_relu = nn.LeakyReLU(negative_slope=0.01)
print(leaky_relu(torch.tensor([-1.0, 0.0, 1.0]))) # tensor([-0.01, 0., 1.])

# PReLU - 可学习的 LeakyReLU
prelu = nn.PReLU() # 斜率是可学习参数

# ELU - 指数线性单元
elu = nn.ELU(alpha=1.0)

# GELU - 高斯误差线性单元,Transformer 中常用
gelu = nn.GELU()

# SELU - 自归一化激活函数
selu = nn.SELU()

# Sigmoid - 输出在 (0, 1) 区间,常用于二分类
# 公式: f(x) = 1 / (1 + exp(-x))
sigmoid = nn.Sigmoid()

# Tanh - 输出在 (-1, 1) 区间
# 公式: f(x) = (exp(x) - exp(-x)) / (exp(x) + exp(-x))
tanh = nn.Tanh()

# Softmax - 多分类输出(沿指定维度)
# 输出和为 1,表示概率分布
softmax = nn.Softmax(dim=1)

# LogSoftmax - Softmax 的对数形式(配合 NLLLoss 使用)
# 数值更稳定
log_softmax = nn.LogSoftmax(dim=1)

# Swish/SiLU - 自门控激活函数,效果优于 ReLU
silu = nn.SiLU() # 也称为 Swish

激活函数选择建议

  • 隐藏层:ReLU 是最常用的选择;对于深层网络,考虑 GELU 或 Swish
  • 输出层:二分类用 Sigmoid,多分类用 Softmax(或通过 CrossEntropyLoss 内置)
  • RNN/LSTM:Tanh 通常表现更好

Dropout 层

Dropout 是一种正则化技术,在训练时随机将部分神经元输出置零,防止神经元过度依赖特定特征,从而减少过拟合。

import torch.nn as nn

# Dropout 层
dropout = nn.Dropout(p=0.5) # p 是丢弃概率

x = torch.randn(1, 10)
y = dropout(x)
print(f"Dropout 前: {x}")
print(f"Dropout 后: {y}") # 约一半元素变为 0,其余元素放大 2 倍

# Alpha Dropout - 保持自归一化属性
alpha_dropout = nn.AlphaDropout(p=0.5)

# Dropout2d - 用于卷积网络,丢弃整个通道
dropout2d = nn.Dropout2d(p=0.5)

# Dropout3d - 用于 3D 卷积
dropout3d = nn.Dropout3d(p=0.5)

# 训练时 Dropout 生效,推理时不生效
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(20, 5)
)

model.train() # 训练模式:Dropout 生效
model.eval() # 评估模式:Dropout 关闭

注意:Dropout 在训练时会对保留的神经元输出进行缩放(乘以 1/(1p)1/(1-p)),以保持期望值不变。在评估模式下,Dropout 层直接传递输入。

BatchNorm 层

BatchNorm(批归一化)通过对每批数据的每个特征进行归一化,使均值为 0、方差为 1,然后通过可学习的缩放和偏移参数恢复表达能力。这可以加速训练、允许使用更大的学习率,并有一定的正则化效果。

import torch.nn as nn

# 1D BatchNorm(常用于全连接网络)
# 输入形状: (N, C) 或 (N, C, L)
bn1d = nn.BatchNorm1d(num_features=100)

# 2D BatchNorm(常用于卷积网络)
# 输入形状: (N, C, H, W)
bn2d = nn.BatchNorm2d(num_features=64)

# 3D BatchNorm(常用于3D卷积)
# 输入形状: (N, C, D, H, W)
bn3d = nn.BatchNorm3d(num_features=64)

# 使用示例
x = torch.randn(32, 100) # batch=32, features=100
y = bn1d(x)
print(f"BatchNorm 后均值: {y.mean(dim=0).abs().max().item():.6f}") # 接近0
print(f"BatchNorm 后标准差: {y.std(dim=0).mean().item():.6f}") # 接近1

# BatchNorm 有两个可学习参数
print(f"缩放参数 gamma: {bn1d.weight.shape}") # [100]
print(f"偏移参数 beta: {bn1d.bias.shape}") # [100]

# 以及两个运行时统计量(非参数)
print(f"运行均值: {bn1d.running_mean.shape}") # [100]
print(f"运行方差: {bn1d.running_var.shape}") # [100]

训练与推理的区别

  • 训练时:使用当前 batch 的均值和方差进行归一化,并更新运行均值和方差
  • 推理时:使用训练过程中累积的运行均值和方差

LayerNorm vs BatchNorm

LayerNorm(层归一化)是另一种归一化方式,在每个样本的所有特征上进行归一化,而不是在 batch 维度上:

import torch.nn as nn

# LayerNorm - 对每个样本的所有特征归一化
# Transformer 中广泛使用
ln = nn.LayerNorm(normalized_shape=768) # 或 [768] 或 [768, 10]

x = torch.randn(32, 10, 768) # batch=32, seq_len=10, hidden=768
y = ln(x)

LayerNorm 的优势是不依赖 batch size,适合小 batch 或变长序列场景。

使用 nn.Sequential 快速构建

对于简单的顺序网络,可以使用 nn.Sequential 快速构建,无需定义 forward 方法:

import torch.nn as nn

# 方式1:直接传入层
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)

# 方式2:使用 OrderedDict(给层命名)
from collections import OrderedDict

model = nn.Sequential(OrderedDict([
('fc1', nn.Linear(784, 256)),
('relu1', nn.ReLU()),
('dropout', nn.Dropout(0.2)),
('fc2', nn.Linear(256, 10))
]))

# 访问特定层
print(model.fc1)
print(model[0]) # 按索引访问

# 添加层
model.add_module('output', nn.Softmax(dim=1))

自定义层

当内置层无法满足需求时,可以通过继承 nn.Module 创建自定义层。自定义层需要正确管理参数和实现前向传播。

无参数的自定义层

如果层不需要可学习参数,实现非常简单:

import torch
import torch.nn as nn

class CenteredLayer(nn.Module):
"""将输入中心化(减去均值)"""

def __init__(self):
super().__init__()

def forward(self, X):
return X - X.mean()

# 使用
layer = CenteredLayer()
x = torch.FloatTensor([1, 2, 3, 4, 5])
print(layer(x)) # tensor([-2., -1., 0., 1., 2.])

带参数的自定义层

使用 nn.Parameter 定义可学习参数,这些参数会被自动追踪:

import torch
import torch.nn as nn

class MyLinear(nn.Module):
"""自定义全连接层"""

def __init__(self, in_features, out_features):
super().__init__()
self.in_features = in_features
self.out_features = out_features

# 定义可学习参数
self.weight = nn.Parameter(torch.randn(out_features, in_features))
self.bias = nn.Parameter(torch.randn(out_features))

def forward(self, x):
# 执行线性变换
return torch.matmul(x, self.weight.t()) + self.bias

def extra_repr(self):
"""自定义打印信息"""
return f'in_features={self.in_features}, out_features={self.out_features}'

# 使用自定义层
linear = MyLinear(10, 5)
print(linear) # 会显示 extra_repr 的内容

x = torch.randn(2, 10)
y = linear(x)
print(f"输出形状: {y.shape}")

# 参数自动被追踪
print(f"参数数量: {sum(p.numel() for p in linear.parameters())}")

带缓冲区(Buffer)的自定义层

除了可训练参数,有时还需要存储不可训练的状态(如 BatchNorm 的运行统计量)。使用 register_buffer 注册缓冲区:

import torch
import torch.nn as nn

class RunningNorm(nn.Module):
"""运行时归一化(类似于 BatchNorm 但不学习)"""

def __init__(self, num_features, momentum=0.1):
super().__init__()
self.momentum = momentum

# 注册缓冲区(不参与梯度计算,但会随模型移动设备)
self.register_buffer('running_mean', torch.zeros(num_features))
self.register_buffer('running_var', torch.ones(num_features))

def forward(self, x):
if self.training:
# 训练时更新运行统计量
mean = x.mean(dim=0)
var = x.var(dim=0, unbiased=False)

self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var

return (x - mean) / torch.sqrt(var + 1e-5)
else:
# 推理时使用运行统计量
return (x - self.running_mean) / torch.sqrt(self.running_var + 1e-5)

# 使用
norm = RunningNorm(10)
norm.train()
x = torch.randn(32, 10)
y = norm(x)

# 缓冲区不在 parameters() 中,但在 state_dict() 中
print(f"参数: {list(norm.parameters())}") # 空
print(f"缓冲区: {list(norm.buffers())}") # [running_mean, running_var]

参数 vs 缓冲区

  • nn.Parameter:可训练参数,会计算梯度,出现在 model.parameters()
  • register_buffer:不可训练状态,不计算梯度,出现在 model.buffers() 中,但会随模型移动设备

复合自定义层

自定义层可以组合其他层,实现复杂的逻辑:

import torch
import torch.nn as nn
import torch.nn.functional as F

class SqueezeExcitation(nn.Module):
"""Squeeze-and-Excitation 模块"""

def __init__(self, channels, reduction=16):
super().__init__()
self.squeeze = nn.AdaptiveAvgPool2d(1)
self.excitation = nn.Sequential(
nn.Linear(channels, channels // reduction, bias=False),
nn.ReLU(inplace=True),
nn.Linear(channels // reduction, channels, bias=False),
nn.Sigmoid()
)

def forward(self, x):
b, c, _, _ = x.size()

# Squeeze: 全局平均池化
y = self.squeeze(x).view(b, c)

# Excitation: 学习通道权重
y = self.excitation(y).view(b, c, 1, 1)

# Scale: 重新加权
return x * y.expand_as(x)

# 使用
se = SqueezeExcitation(channels=64)
x = torch.randn(8, 64, 32, 32)
y = se(x)
print(f"输出形状: {y.shape}")

自定义损失函数

除了内置的损失函数(如 nn.CrossEntropyLossnn.MSELoss),有时需要自定义损失函数来满足特定需求。

作为函数的损失

最简单的方式是定义一个函数:

import torch
import torch.nn.functional as F

def dice_loss(pred, target, smooth=1.0):
"""
Dice 损失,常用于图像分割

Dice = 2|A∩B| / (|A|+|B|)
"""
pred = torch.sigmoid(pred)
pred_flat = pred.view(-1)
target_flat = target.view(-1)

intersection = (pred_flat * target_flat).sum()
dice = (2.0 * intersection + smooth) / (pred_flat.sum() + target_flat.sum() + smooth)

return 1.0 - dice

# 使用
pred = torch.randn(4, 1, 32, 32, requires_grad=True)
target = torch.randint(0, 2, (4, 1, 32, 32)).float()
loss = dice_loss(pred, target)
loss.backward()

作为模块的损失

如果损失函数需要可学习参数,可以定义为 nn.Module

import torch
import torch.nn as nn

class FocalLoss(nn.Module):
"""
Focal Loss - 解决类别不平衡问题

FL(p_t) = -α_t * (1 - p_t)^γ * log(p_t)

γ: focusing 参数,增加对难分类样本的关注
α: 类别权重
"""

def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction

def forward(self, inputs, targets):
# inputs: [N, C] 未归一化的 logits
# targets: [N] 类别索引

ce_loss = F.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss) # 预测正确类别的概率

# 计算类别权重
at = self.alpha * targets + (1 - self.alpha) * (1 - targets)

# Focal Loss
focal_loss = at * (1 - pt) ** self.gamma * ce_loss

if self.reduction == 'mean':
return focal_loss.mean()
elif self.reduction == 'sum':
return focal_loss.sum()
else:
return focal_loss

# 使用
criterion = FocalLoss(alpha=0.25, gamma=2.0)
pred = torch.randn(8, 10)
target = torch.randint(0, 10, (8,))
loss = criterion(pred, target)
print(f"Focal Loss: {loss.item()}")

组合多个损失

在多任务学习中,经常需要组合多个损失函数:

import torch
import torch.nn as nn

class MultiTaskLoss(nn.Module):
"""多任务损失"""

def __init__(self, num_tasks, weights=None):
super().__init__()
if weights is None:
weights = torch.ones(num_tasks)
self.register_buffer('weights', weights)

self.task_losses = nn.ModuleList([
nn.CrossEntropyLoss() for _ in range(num_tasks)
])

def forward(self, predictions, targets):
"""
predictions: list of tensors, 每个任务的预测
targets: list of tensors, 每个任务的目标
"""
total_loss = 0.0
for i, (pred, target, loss_fn) in enumerate(zip(predictions, targets, self.task_losses)):
total_loss += self.weights[i] * loss_fn(pred, target)
return total_loss

# 使用
criterion = MultiTaskLoss(num_tasks=2, weights=[1.0, 0.5])
pred1 = torch.randn(8, 10)
pred2 = torch.randn(8, 5)
target1 = torch.randint(0, 10, (8,))
target2 = torch.randint(0, 5, (8,))

loss = criterion([pred1, pred2], [target1, target2])
print(f"多任务损失: {loss.item()}")

注意力机制

注意力机制是现代深度学习的核心技术,尤其在 Transformer 架构中广泛应用。PyTorch 提供了 nn.MultiheadAttention 模块,同时也支持手动实现。

使用 nn.MultiheadAttention

PyTorch 内置的 nn.MultiheadAttention 实现了标准的多头注意力机制:

import torch
import torch.nn as nn

# 创建多头注意力层
embed_dim = 512 # 嵌入维度
num_heads = 8 # 注意力头数
mha = nn.MultiheadAttention(
embed_dim=embed_dim,
num_heads=num_heads,
dropout=0.1,
bias=True,
batch_first=True # 输入形状为 (batch, seq, feature)
)

# 输入
batch_size = 4
seq_len = 10
query = torch.randn(batch_size, seq_len, embed_dim)
key = torch.randn(batch_size, seq_len, embed_dim)
value = torch.randn(batch_size, seq_len, embed_dim)

# 计算注意力
attn_output, attn_weights = mha(query, key, value)

print(f"输出形状: {attn_output.shape}") # [4, 10, 512]
print(f"注意力权重形状: {attn_weights.shape}") # [4, 10, 10]

# 自注意力(query = key = value)
self_attn_output, _ = mha(query, query, query)

参数说明

  • embed_dim:总嵌入维度,会被 num_heads 平分
  • num_heads:并行注意力头数,每个头的维度为 embed_dim // num_heads
  • dropout:注意力权重的 dropout 概率
  • batch_first:输入形状为 (batch, seq, feature) 还是 (seq, batch, feature)

因果注意力(自回归)

在语言模型等自回归场景中,需要使用因果掩码防止看到未来信息:

import torch
import torch.nn as nn

embed_dim = 256
num_heads = 4
seq_len = 10

mha = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

# 创建因果掩码
# True 表示不允许 attend
causal_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
print("因果掩码:")
print(causal_mask.int())
# [[0, 1, 1, 1, ...],
# [0, 0, 1, 1, ...],
# [0, 0, 0, 1, ...],
# ...]

query = torch.randn(2, seq_len, embed_dim)

# 使用 is_causal 参数(PyTorch 2.0+)
output, _ = mha(query, query, query, is_causal=True)

# 或使用 attn_mask
# output, _ = mha(query, query, query, attn_mask=causal_mask)

从零实现多头注意力

理解注意力机制的内部实现:

import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
"""手动实现的多头注意力"""

def __init__(self, embed_dim, num_heads, dropout=0.0):
super().__init__()
assert embed_dim % num_heads == 0, "embed_dim 必须能被 num_heads 整除"

self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

# 线性投影层
self.q_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)

self.dropout = nn.Dropout(dropout)

def forward(self, query, key, value, attn_mask=None):
"""
query, key, value: [batch, seq_len, embed_dim]
attn_mask: [seq_len, seq_len] 或 [batch, seq_len, seq_len]
"""
batch_size, seq_len, _ = query.shape

# 线性投影
Q = self.q_proj(query) # [batch, seq, embed]
K = self.k_proj(key)
V = self.v_proj(value)

# 重塑为多头: [batch, num_heads, seq, head_dim]
Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)

# 计算注意力分数
# [batch, heads, seq, head_dim] @ [batch, heads, head_dim, seq] -> [batch, heads, seq, seq]
scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)

# 应用掩码
if attn_mask is not None:
scores = scores.masked_fill(attn_mask == True, float('-inf'))

# Softmax
attn_weights = torch.softmax(scores, dim=-1)
attn_weights = self.dropout(attn_weights)

# 加权求和
# [batch, heads, seq, seq] @ [batch, heads, seq, head_dim] -> [batch, heads, seq, head_dim]
attn_output = torch.matmul(attn_weights, V)

# 合并多头: [batch, seq, embed]
attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

# 输出投影
output = self.out_proj(attn_output)

return output, attn_weights

# 使用自定义实现
mha = MultiHeadAttention(embed_dim=512, num_heads=8)
query = torch.randn(2, 10, 512)
output, weights = mha(query, query, query)
print(f"输出形状: {output.shape}")
print(f"权重形状: {weights.shape}")

Transformer 编码器块

结合注意力机制和前馈网络构建完整的 Transformer 编码器块:

import torch
import torch.nn as nn

class TransformerEncoderBlock(nn.Module):
"""Transformer 编码器块"""

def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
super().__init__()

# 多头自注意力
self.self_attn = nn.MultiheadAttention(
embed_dim, num_heads, dropout=dropout, batch_first=True
)

# 前馈网络
self.ffn = nn.Sequential(
nn.Linear(embed_dim, ff_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(ff_dim, embed_dim),
nn.Dropout(dropout)
)

# 层归一化
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)

self.dropout = nn.Dropout(dropout)

def forward(self, x, attn_mask=None):
# 自注意力 + 残差连接 + LayerNorm (Pre-LN)
attn_output, _ = self.self_attn(x, x, x, attn_mask=attn_mask)
x = self.norm1(x + self.dropout(attn_output))

# 前馈网络 + 残差连接 + LayerNorm
ffn_output = self.ffn(x)
x = self.norm2(x + ffn_output)

return x

# 使用
block = TransformerEncoderBlock(embed_dim=512, num_heads=8, ff_dim=2048)
x = torch.randn(2, 10, 512)
output = block(x)
print(f"输出形状: {output.shape}")

模型钩子(Hooks)

钩子(Hooks)允许你在前向传播或反向传播过程中拦截和修改数据,这对于可视化、调试和实现复杂功能非常有用。

前向钩子

前向钩子可以在层的前向传播前后执行自定义操作:

import torch
import torch.nn as nn

model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 5)
)

# 定义前向钩子
def forward_hook(module, input, output):
"""在前向传播后调用"""
print(f"{module.__class__.__name__} 前向传播:")
print(f" 输入形状: {input[0].shape}")
print(f" 输出形状: {output.shape}")

# 注册钩子
hook_handle = model[0].register_forward_hook(forward_hook)

# 前向传播
x = torch.randn(2, 10)
y = model(x)

# 移除钩子
hook_handle.remove()

反向钩子

反向钩子可以在反向传播过程中访问梯度:

import torch
import torch.nn as nn

model = nn.Linear(10, 5)

# 定义反向钩子
def backward_hook(module, grad_input, grad_output):
"""在反向传播时调用"""
print(f"{module.__class__.__name__} 反向传播:")
print(f" 输入梯度: {[g.shape for g in grad_input if g is not None]}")
print(f" 输出梯度: {[g.shape for g in grad_output if g is not None]}")

# 注册钩子
hook_handle = model.register_backward_hook(backward_hook)

# 前向传播
x = torch.randn(2, 10)
y = model(x)
loss = y.sum()

# 反向传播
loss.backward()

# 移除钩子
hook_handle.remove()

实用示例:特征提取

使用钩子提取中间层特征:

import torch
import torch.nn as nn

class FeatureExtractor:
"""提取模型中间层特征"""

def __init__(self, model, layer_names):
self.model = model
self.features = {}
self.handles = []

for name, layer in model.named_modules():
if name in layer_names:
handle = layer.register_forward_hook(self._hook(name))
self.handles.append(handle)

def _hook(self, name):
def hook(module, input, output):
self.features[name] = output.detach()
return hook

def __call__(self, x):
self.features = {}
self.model(x)
return self.features

def remove(self):
for handle in self.handles:
handle.remove()

# 使用
model = nn.Sequential(
nn.Linear(10, 20),
nn.ReLU(),
nn.Linear(20, 5)
)

extractor = FeatureExtractor(model, ['0', '1'])

x = torch.randn(2, 10)
features = extractor(x)

for name, feature in features.items():
print(f"{name}: {feature.shape}")

# 清理
extractor.remove()

构建复杂网络

多层感知机 (MLP)

import torch
import torch.nn as nn

class MLP(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size, dropout=0.2):
super(MLP, self).__init__()

# 构建多层网络
layers = []
prev_size = input_size

for hidden_size in hidden_sizes:
layers.append(nn.Linear(prev_size, hidden_size))
layers.append(nn.BatchNorm1d(hidden_size))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_size = hidden_size

# 输出层(通常不加激活,让损失函数处理)
layers.append(nn.Linear(prev_size, output_size))

self.network = nn.Sequential(*layers)

def forward(self, x):
return self.network(x)

# 创建模型
model = MLP(input_size=784, hidden_sizes=[512, 256, 128], output_size=10)
print(model)

# 测试
x = torch.randn(64, 784)
output = model(x)
print(f"输出形状: {output.shape}") # torch.Size([64, 10])

带残差连接的深度网络

残差连接(Residual Connection)通过跳跃连接帮助梯度流动,使得训练更深的网络成为可能:

import torch.nn as nn

class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1):
super(ResidualBlock, self).__init__()

self.conv1 = nn.Conv2d(in_channels, out_channels,
kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels,
kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)

# 残差连接(维度不匹配时需要投影)
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
nn.Conv2d(in_channels, out_channels,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)

def forward(self, x):
residual = self.shortcut(x)

out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)

out = self.conv2(out)
out = self.bn2(out)

out += residual # 残差连接
out = self.relu(out)

return out

class ResNetLike(nn.Module):
def __init__(self, num_classes=10):
super(ResNetLike, self).__init__()

self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)

self.layer1 = self._make_layer(64, 64, 2, stride=1)
self.layer2 = self._make_layer(64, 128, 2, stride=2)

self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(128, num_classes)

def _make_layer(self, in_channels, out_channels, num_blocks, stride):
layers = []
layers.append(ResidualBlock(in_channels, out_channels, stride))
for _ in range(1, num_blocks):
layers.append(ResidualBlock(out_channels, out_channels))
return nn.Sequential(*layers)

def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)

x = self.layer1(x)
x = self.layer2(x)

x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)

return x

前向传播与推理

基本推理流程

import torch
import torch.nn as nn

# 创建模型
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
)

# 切换到评估模式(重要!影响 Dropout、BatchNorm 等层的行为)
model.eval()

# 推理
x = torch.randn(1, 784)

# 方式1:直接调用
with torch.no_grad(): # 禁用梯度计算,节省内存
output = model(x)

# 方式2:使用 forward(等效)
with torch.no_grad():
output = model.forward(x)

# 获取预测类别
pred = output.argmax(dim=1)
print(f"预测类别: {pred.item()}")

GPU 推理

# 检查 GPU 是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 移动模型到 GPU
model = model.to(device)

# 移动输入到 GPU
x = x.to(device)

# 推理
with torch.no_grad():
output = model(x)

模型参数管理

访问参数

# 方式1:直接访问
for param in model.parameters():
print(f"参数形状: {param.shape}")

# 方式2:命名参数
for name, param in model.named_parameters():
print(f"{name}: {param.shape}")

# 初始化参数
def initialize_weights(m):
if isinstance(m, nn.Linear):
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)

model.apply(initialize_weights)

常见初始化方法

import torch.nn.init as init

# Xavier 初始化(适合 tanh、sigmoid 激活)
init.xavier_uniform_(layer.weight)
init.xavier_normal_(layer.weight)

# Kaiming 初始化(适合 ReLU/LeakyReLU)
init.kaiming_uniform_(layer.weight, nonlinearity='relu')
init.kaiming_normal_(layer.weight, nonlinearity='relu')

# 常数初始化
init.zeros_(layer.weight)
init.ones_(layer.weight)
init.constant_(layer.weight, 0.5)

# 正态分布初始化
init.normal_(layer.weight, mean=0, std=0.01)

# 截断正态分布(常用于 Transformer)
init.trunc_normal_(layer.weight, mean=0, std=0.02, a=-2, b=2)

参数共享

有时需要在多个层之间共享参数:

import torch
import torch.nn as nn

class SharedWeights(nn.Module):
"""共享权重的模型"""

def __init__(self):
super().__init__()

# 创建一个共享的线性层
self.shared_linear = nn.Linear(10, 10)

# 在多个地方使用同一个层
self.layer1 = self.shared_linear
self.layer2 = self.shared_linear # 共享权重

def forward(self, x):
x = self.layer1(x)
x = torch.relu(x)
x = self.layer2(x) # 使用相同的权重
return x

# 使用
model = SharedWeights()
x = torch.randn(2, 10)
y = model(x)
print(f"输出形状: {y.shape}")

# 验证参数共享
print(f"参数数量: {sum(p.numel() for p in model.parameters())}") # 只有 110 个参数

完整示例:MNIST 分类器

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# 定义模型
class MNISTClassifier(nn.Module):
def __init__(self):
super(MNISTClassifier, self).__init__()
self.network = nn.Sequential(
nn.Linear(784, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(256, 10)
)

def forward(self, x):
x = x.view(-1, 784) # 展平
return self.network(x)

# 创建模型
model = MNISTClassifier()

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 模拟数据
x_train = torch.randn(1000, 1, 28, 28)
y_train = torch.randint(0, 10, (1000,))

train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 训练
model.train()
for epoch in range(5):
total_loss = 0
for batch_x, batch_y in train_loader:
# 前向传播
outputs = model(batch_x)
loss = criterion(outputs, batch_y)

# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")

# 评估
model.eval()
with torch.no_grad():
x_test = torch.randn(10, 1, 28, 28)
outputs = model(x_test)
_, predicted = torch.max(outputs, 1)
print(f"预测结果: {predicted}")

小结

本章我们系统学习了 PyTorch 神经网络模块的核心知识:

  1. nn.Module 基础:理解模块的层次结构、参数管理和设备处理
  2. 常用网络层:全连接层、激活函数、Dropout、BatchNorm 等
  3. 自定义层:使用 nn.Parameterregister_buffer 创建可复用的组件
  4. 自定义损失函数:实现 Dice Loss、Focal Loss 等专业损失
  5. 注意力机制:理解并实现多头注意力,构建 Transformer 块
  6. 模型钩子:在前向和反向传播中拦截数据,用于调试和特征提取
  7. 参数管理:初始化、共享和管理模型参数

参考资源

神经网络模型需要数据来训练。下一章我们将学习如何使用 PyTorch 的 DatasetDataLoader 高效加载和预处理数据。