跳到主要内容

循环神经网络 (RNN/LSTM)

循环神经网络(Recurrent Neural Network, RNN)是处理序列数据的深度学习模型。本章将介绍 RNN 的基本原理、LSTM 的改进以及 PyTorch 中的实现方法。

序列数据与 RNN

什么是序列数据?

序列数据是指具有时间或顺序依赖关系的数据,当前时刻的数据与之前时刻的数据相关。常见的序列数据包括:

  • 文本:单词序列,当前词与上下文相关
  • 语音:音频信号序列
  • 时间序列:股票价格、天气数据等
  • 视频:图像帧序列

RNN 的核心思想

RNN 通过隐藏状态在时间步之间传递信息,实现对历史信息的记忆:

import torch
import torch.nn as nn

rnn = nn.RNN(input_size=10, hidden_size=20, num_layers=1, batch_first=True)

x = torch.randn(32, 5, 10)

output, hidden = rnn(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")

RNN 的工作原理

RNN 在每个时间步执行以下计算:

h_t = tanh(W_ih * x_t + W_hh * h_{t-1} + b_h)
y_t = W_hy * h_t + b_y

其中:

  • x_t 是当前时间步的输入
  • h_{t-1} 是上一时间步的隐藏状态
  • h_t 是当前时间步的隐藏状态
  • y_t 是当前时间步的输出

PyTorch RNN 模块

基本使用

import torch
import torch.nn as nn

rnn = nn.RNN(
input_size=10,
hidden_size=20,
num_layers=2,
nonlinearity='tanh',
bias=True,
batch_first=True,
dropout=0.1,
bidirectional=False
)

batch_size = 32
seq_len = 5
input_size = 10

x = torch.randn(batch_size, seq_len, input_size)

h0 = torch.zeros(2, batch_size, 20)

output, hidden = rnn(x, h0)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {hidden.shape}")

参数详解

参数说明默认值
input_size输入特征维度必填
hidden_size隐藏状态维度必填
num_layersRNN 层数1
nonlinearity激活函数'tanh'
bias是否使用偏置True
batch_first输入是否为 (batch, seq, feature)False
dropout层间 dropout 概率0
bidirectional是否双向 RNNFalse

输入输出格式

import torch.nn as nn

rnn = nn.RNN(input_size=10, hidden_size=20, batch_first=True)

x = torch.randn(32, 5, 10)
output, hidden = rnn(x)

print("batch_first=True:")
print(f" 输入: (batch_size, seq_len, input_size) = {x.shape}")
print(f" 输出: (batch_size, seq_len, hidden_size) = {output.shape}")
print(f" 隐藏: (num_layers, batch_size, hidden_size) = {hidden.shape}")

rnn = nn.RNN(input_size=10, hidden_size=20, batch_first=False)

x = torch.randn(5, 32, 10)
output, hidden = rnn(x)

print("\nbatch_first=False:")
print(f" 输入: (seq_len, batch_size, input_size) = {x.shape}")
print(f" 输出: (seq_len, batch_size, hidden_size) = {output.shape}")
print(f" 隐藏: (num_layers, batch_size, hidden_size) = {hidden.shape}")

LSTM 网络

LSTM 解决的问题

标准 RNN 存在梯度消失问题,难以学习长期依赖。LSTM(Long Short-Term Memory)通过门控机制解决这个问题:

  • 遗忘门:决定丢弃哪些历史信息
  • 输入门:决定更新哪些信息
  • 输出门:决定输出哪些信息

LSTM 的结构

LSTM 单元结构:

c_{t-1} ────────┬────────────────────→ c_t

h_{t-1} ──→ [遗忘门] ──┐
[输入门] ──┼──→ [tanh] ──→ 更新单元状态
[输出门] ──┘ │
│ │
x_t ────────────┴────────────────────────┘

┌───┴───┐
│ 输出 │
└───┬───┘

h_t ────→ 输出

PyTorch LSTM 使用

import torch
import torch.nn as nn

lstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
bias=True,
batch_first=True,
dropout=0.1,
bidirectional=False
)

batch_size = 32
seq_len = 5
input_size = 10

x = torch.randn(batch_size, seq_len, input_size)

h0 = torch.zeros(2, batch_size, 20)
c0 = torch.zeros(2, batch_size, 20)

output, (hidden, cell) = lstm(x, (h0, c0))

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")
print(f"单元状态形状: {cell.shape}")

LSTM vs RNN

import torch
import torch.nn as nn

input_size = 10
hidden_size = 20
batch_size = 32
seq_len = 5

x = torch.randn(batch_size, seq_len, input_size)

rnn = nn.RNN(input_size, hidden_size, batch_first=True)
lstm = nn.LSTM(input_size, hidden_size, batch_first=True)

rnn_output, rnn_hidden = rnn(x)
lstm_output, (lstm_hidden, lstm_cell) = lstm(x)

print("RNN:")
print(f" 输出: {rnn_output.shape}")
print(f" 隐藏状态: {rnn_hidden.shape}")

print("\nLSTM:")
print(f" 输出: {lstm_output.shape}")
print(f" 隐藏状态: {lstm_hidden.shape}")
print(f" 单元状态: {lstm_cell.shape}")

GRU 网络

GRU(Gated Recurrent Unit)是 LSTM 的简化版本,参数更少,训练更快:

import torch
import torch.nn as nn

gru = nn.GRU(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
dropout=0.1
)

x = torch.randn(32, 5, 10)
output, hidden = gru(x)

print(f"GRU 输出形状: {output.shape}")
print(f"GRU 隐藏状态形状: {hidden.shape}")

RNN、LSTM、GRU 对比

特性RNNLSTMGRU
参数量最少最多中等
训练速度最快最慢中等
长期依赖
门控数量032
适用场景短序列长序列长序列

双向 RNN

双向 RNN 同时考虑过去和未来的信息:

import torch
import torch.nn as nn

bilstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
bidirectional=True
)

x = torch.randn(32, 5, 10)
output, (hidden, cell) = bilstm(x)

print(f"双向 LSTM 输出形状: {output.shape}")
print(f" 注意: hidden_size * 2 = {20 * 2}")

print(f"隐藏状态形状: {hidden.shape}")
print(f" 注意: num_layers * 2 = {2 * 2}")

构建序列模型

文本分类模型

import torch
import torch.nn as nn

class TextClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()

self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)

self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)

def forward(self, x):
embedded = self.embedding(x)

lstm_out, (hidden, _) = self.lstm(embedded)

hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)

out = self.dropout(hidden_cat)
out = self.fc(out)

return out

vocab_size = 10000
embedding_dim = 128
hidden_size = 256
num_layers = 2
num_classes = 5

model = TextClassifier(vocab_size, embedding_dim, hidden_size, num_layers, num_classes)

x = torch.randint(0, vocab_size, (32, 50))
output = model(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")

序列标注模型

import torch
import torch.nn as nn

class SequenceLabeling(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_tags, dropout=0.5):
super().__init__()

self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)

self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)

self.fc = nn.Linear(hidden_size * 2, num_tags)
self.dropout = nn.Dropout(dropout)

def forward(self, x):
embedded = self.embedding(x)

lstm_out, _ = self.lstm(embedded)

lstm_out = self.dropout(lstm_out)

out = self.fc(lstm_out)

return out

vocab_size = 10000
embedding_dim = 128
hidden_size = 256
num_layers = 2
num_tags = 5

model = SequenceLabeling(vocab_size, embedding_dim, hidden_size, num_layers, num_tags)

x = torch.randint(0, vocab_size, (32, 50))
output = model(x)

print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")

序列到序列模型

import torch
import torch.nn as nn

class Encoder(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout=0.5):
super().__init__()

self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)

def forward(self, x):
embedded = self.embedding(x)
outputs, (hidden, cell) = self.lstm(embedded)
return outputs, hidden, cell

class Decoder(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout=0.5):
super().__init__()

self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
self.dropout = nn.Dropout(dropout)

def forward(self, x, hidden, cell):
embedded = self.dropout(self.embedding(x))
output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
output = self.fc(output)
return output, hidden, cell

class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device

def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.size(0)
trg_len = trg.size(1)
trg_vocab_size = self.decoder.fc.out_features

outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

_, hidden, cell = self.encoder(src)

input = trg[:, 0].unsqueeze(1)

for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)

teacher_force = torch.rand(1).item() < teacher_forcing_ratio
top1 = output.argmax(2)
input = trg[:, t].unsqueeze(1) if teacher_force else top1

return outputs

vocab_size = 10000
embedding_dim = 256
hidden_size = 512
num_layers = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = Encoder(vocab_size, embedding_dim, hidden_size, num_layers)
decoder = Decoder(vocab_size, embedding_dim, hidden_size, num_layers)
seq2seq = Seq2Seq(encoder, decoder, device).to(device)

src = torch.randint(0, vocab_size, (32, 20)).to(device)
trg = torch.randint(0, vocab_size, (32, 15)).to(device)

output = seq2seq(src, trg)
print(f"Seq2Seq 输出形状: {output.shape}")

处理变长序列

使用 pack_padded_sequence

处理不等长序列时,使用打包序列提高效率:

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

lstm = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)

batch_size = 4
max_seq_len = 6
input_size = 10

x = torch.randn(batch_size, max_seq_len, input_size)

lengths = torch.tensor([6, 4, 3, 1])

sorted_lengths, sorted_indices = torch.sort(lengths, descending=True)
sorted_x = x[sorted_indices]

packed_x = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)

packed_output, (hidden, cell) = lstm(packed_x)

output, output_lengths = pad_packed_sequence(packed_output, batch_first=True)

print(f"原始输入形状: {x.shape}")
print(f"序列长度: {lengths.tolist()}")
print(f"打包后输出形状: {output.shape}")
print(f"输出长度: {output_lengths.tolist()}")

完整的变长序列处理流程

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class VarLenLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, bidirectional=True)
self.fc = nn.Linear(hidden_size * 2, num_classes)

def forward(self, x, lengths):
sorted_lengths, sorted_indices = torch.sort(lengths, descending=True)
sorted_x = x[sorted_indices]

packed_x = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)

packed_output, (hidden, _) = self.lstm(packed_x)

output, _ = pad_packed_sequence(packed_output, batch_first=True)

_, unsorted_indices = torch.sort(sorted_indices)
output = output[unsorted_indices]

hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)
hidden_cat = hidden_cat[unsorted_indices]

out = self.fc(hidden_cat)
return out

model = VarLenLSTM(input_size=10, hidden_size=20, num_layers=2, num_classes=5)

x = torch.randn(4, 6, 10)
lengths = torch.tensor([6, 4, 3, 1])

output = model(x, lengths)
print(f"输出形状: {output.shape}")

实战案例:情感分析

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence

class SentimentDataset(Dataset):
def __init__(self, texts, labels, vocab, max_len=100):
self.texts = texts
self.labels = labels
self.vocab = vocab
self.max_len = max_len

def __len__(self):
return len(self.texts)

def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]

tokens = text.lower().split()[:self.max_len]
indices = [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]

return torch.tensor(indices), torch.tensor(label)

def collate_fn(batch):
texts, labels = zip(*batch)
lengths = torch.tensor([len(t) for t in texts])
texts_padded = pad_sequence(texts, batch_first=True, padding_value=0)
labels = torch.stack(labels)
return texts_padded, lengths, labels

class SentimentLSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()

self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, bidirectional=True, dropout=dropout)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)

def forward(self, x, lengths):
embedded = self.embedding(x)

packed = pack_padded_sequence(embedded, lengths.cpu(),
batch_first=True, enforce_sorted=False)
packed_output, (hidden, _) = self.lstm(packed)

hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)
out = self.dropout(hidden_cat)
out = self.fc(out)
return out

vocab = {'<pad>': 0, '<unk>': 1}
for i, word in enumerate(['good', 'bad', 'great', 'terrible', 'excellent', 'poor', 'movie', 'film', 'acting', 'story'], 2):
vocab[word] = i

texts = [
"good movie great acting",
"terrible film poor story",
"excellent movie great story",
"bad acting poor film"
]
labels = [1, 0, 1, 0]

dataset = SentimentDataset(texts, labels, vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)

vocab_size = len(vocab)
embedding_dim = 50
hidden_size = 64
num_layers = 2
num_classes = 2

model = SentimentLSTM(vocab_size, embedding_dim, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

print("训练情感分析模型...")
for epoch in range(10):
model.train()
total_loss = 0

for texts, lengths, labels in dataloader:
optimizer.zero_grad()
outputs = model(texts, lengths)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()

if (epoch + 1) % 2 == 0:
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")

print("\n模型训练完成!")

训练技巧

梯度裁剪

防止 RNN 训练中的梯度爆炸:

import torch
import torch.nn as nn
import torch.optim as optim

model = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

x = torch.randn(32, 10, 10)
y = torch.randn(32, 10, 20)

output, _ = model(x)
loss = criterion(output, y)

optimizer.zero_grad()
loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

optimizer.step()

print("梯度裁剪完成")

学习率调度

import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler

model = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)
optimizer = optim.Adam(model.parameters(), lr=0.001)

scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)

for epoch in range(100):
train_loss = train_one_epoch()
val_loss = validate()

scheduler.step(val_loss)

current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch {epoch}, LR: {current_lr}")

Teacher Forcing

在序列到序列训练中使用:

import torch
import torch.nn as nn
import random

def train_with_teacher_forcing(model, src, trg, criterion, optimizer, teacher_forcing_ratio=0.5):
model.train()

batch_size = src.size(0)
trg_len = trg.size(1)
trg_vocab_size = model.decoder.fc.out_features

outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(src.device)

_, hidden, cell = model.encoder(src)

input = trg[:, 0].unsqueeze(1)

for t in range(1, trg_len):
output, hidden, cell = model.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)

use_teacher_forcing = random.random() < teacher_forcing_ratio
if use_teacher_forcing:
input = trg[:, t].unsqueeze(1)
else:
input = output.argmax(2)

loss = criterion(outputs[:, 1:].reshape(-1, trg_vocab_size),
trg[:, 1:].reshape(-1))

optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()

return loss.item()

常见问题

问题 1:梯度消失/爆炸

症状:训练不稳定,损失 NaN

解决方案

  • 使用 LSTM/GRU 替代 RNN
  • 使用梯度裁剪
  • 使用合适的权重初始化
  • 使用 BatchNorm 或 LayerNorm

问题 2:训练速度慢

症状:RNN 训练时间过长

解决方案

  • 使用 pack_padded_sequence 处理变长序列
  • 减少层数或隐藏单元数
  • 使用 GRU 替代 LSTM
  • 使用更大的 batch size

问题 3:过拟合

症状:训练准确率高,验证准确率低

解决方案

  • 增加 Dropout
  • 使用权重衰减
  • 增加训练数据
  • 使用早停

小结

本章我们学习了:

  1. RNN 基础:序列数据处理和隐藏状态传递
  2. LSTM/GRU:解决长期依赖问题的门控机制
  3. 双向 RNN:同时利用过去和未来信息
  4. 序列模型:文本分类、序列标注、序列到序列
  5. 变长序列处理:pack_padded_sequence 的使用
  6. 训练技巧:梯度裁剪、学习率调度、Teacher Forcing

参考资源