循环神经网络 (RNN/LSTM)
循环神经网络(Recurrent Neural Network, RNN)是处理序列数据的深度学习模型。本章将介绍 RNN 的基本原理、LSTM 的改进以及 PyTorch 中的实现方法。
序列数据与 RNN
什么是序列数据?
序列数据是指具有时间或顺序依赖关系的数据,当前时刻的数据与之前时刻的数据相关。常见的序列数据包括:
- 文本:单词序列,当前词与上下文相关
- 语音:音频信号序列
- 时间序列:股票价格、天气数据等
- 视频:图像帧序列
RNN 的核心思想
RNN 通过隐藏状态在时间步之间传递信息,实现对历史信息的记忆:
import torch
import torch.nn as nn
rnn = nn.RNN(input_size=10, hidden_size=20, num_layers=1, batch_first=True)
x = torch.randn(32, 5, 10)
output, hidden = rnn(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")
RNN 的工作原理
RNN 在每个时间步执行以下计算:
h_t = tanh(W_ih * x_t + W_hh * h_{t-1} + b_h)
y_t = W_hy * h_t + b_y
其中:
x_t是当前时间步的输入h_{t-1}是上一时间步的隐藏状态h_t是当前时间步的隐藏状态y_t是当前时间步的输出
PyTorch RNN 模块
基本使用
import torch
import torch.nn as nn
rnn = nn.RNN(
input_size=10,
hidden_size=20,
num_layers=2,
nonlinearity='tanh',
bias=True,
batch_first=True,
dropout=0.1,
bidirectional=False
)
batch_size = 32
seq_len = 5
input_size = 10
x = torch.randn(batch_size, seq_len, input_size)
h0 = torch.zeros(2, batch_size, 20)
output, hidden = rnn(x, h0)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"最终隐藏状态形状: {hidden.shape}")
参数详解
| 参数 | 说明 | 默认值 |
|---|---|---|
input_size | 输入特征维度 | 必填 |
hidden_size | 隐藏状态维度 | 必填 |
num_layers | RNN 层数 | 1 |
nonlinearity | 激活函数 | 'tanh' |
bias | 是否使用偏置 | True |
batch_first | 输入是否为 (batch, seq, feature) | False |
dropout | 层间 dropout 概率 | 0 |
bidirectional | 是否双向 RNN | False |
输入输出格式
import torch.nn as nn
rnn = nn.RNN(input_size=10, hidden_size=20, batch_first=True)
x = torch.randn(32, 5, 10)
output, hidden = rnn(x)
print("batch_first=True:")
print(f" 输入: (batch_size, seq_len, input_size) = {x.shape}")
print(f" 输出: (batch_size, seq_len, hidden_size) = {output.shape}")
print(f" 隐藏: (num_layers, batch_size, hidden_size) = {hidden.shape}")
rnn = nn.RNN(input_size=10, hidden_size=20, batch_first=False)
x = torch.randn(5, 32, 10)
output, hidden = rnn(x)
print("\nbatch_first=False:")
print(f" 输入: (seq_len, batch_size, input_size) = {x.shape}")
print(f" 输出: (seq_len, batch_size, hidden_size) = {output.shape}")
print(f" 隐藏: (num_layers, batch_size, hidden_size) = {hidden.shape}")
LSTM 网络
LSTM 解决的问题
标准 RNN 存在梯度消失问题,难以学习长期依赖。LSTM(Long Short-Term Memory)通过门控机制解决这个问题:
- 遗忘门:决定丢弃哪些历史信息
- 输入门:决定更新哪些信息
- 输出门:决定输出哪些信息
LSTM 的结构
LSTM 单元结构:
c_{t-1} ────────┬────────────────────→ c_t
│
h_{t-1} ──→ [遗忘门] ──┐
[输入门] ──┼──→ [tanh] ──→ 更新单元状态
[输出门] ──┘ │
│ │
x_t ────────────┴────────────────────────┘
│
┌───┴───┐
│ 输出 │
└───┬───┘
│
h_t ────→ 输出
PyTorch LSTM 使用
import torch
import torch.nn as nn
lstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
bias=True,
batch_first=True,
dropout=0.1,
bidirectional=False
)
batch_size = 32
seq_len = 5
input_size = 10
x = torch.randn(batch_size, seq_len, input_size)
h0 = torch.zeros(2, batch_size, 20)
c0 = torch.zeros(2, batch_size, 20)
output, (hidden, cell) = lstm(x, (h0, c0))
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
print(f"隐藏状态形状: {hidden.shape}")
print(f"单元状态形状: {cell.shape}")
LSTM vs RNN
import torch
import torch.nn as nn
input_size = 10
hidden_size = 20
batch_size = 32
seq_len = 5
x = torch.randn(batch_size, seq_len, input_size)
rnn = nn.RNN(input_size, hidden_size, batch_first=True)
lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
rnn_output, rnn_hidden = rnn(x)
lstm_output, (lstm_hidden, lstm_cell) = lstm(x)
print("RNN:")
print(f" 输出: {rnn_output.shape}")
print(f" 隐藏状态: {rnn_hidden.shape}")
print("\nLSTM:")
print(f" 输出: {lstm_output.shape}")
print(f" 隐藏状态: {lstm_hidden.shape}")
print(f" 单元状态: {lstm_cell.shape}")
GRU 网络
GRU(Gated Recurrent Unit)是 LSTM 的简化版本,参数更少,训练更快:
import torch
import torch.nn as nn
gru = nn.GRU(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
dropout=0.1
)
x = torch.randn(32, 5, 10)
output, hidden = gru(x)
print(f"GRU 输出形状: {output.shape}")
print(f"GRU 隐藏状态形状: {hidden.shape}")
RNN、LSTM、GRU 对比
| 特性 | RNN | LSTM | GRU |
|---|---|---|---|
| 参数量 | 最少 | 最多 | 中等 |
| 训练速度 | 最快 | 最慢 | 中等 |
| 长期依赖 | 差 | 好 | 好 |
| 门控数量 | 0 | 3 | 2 |
| 适用场景 | 短序列 | 长序列 | 长序列 |
双向 RNN
双向 RNN 同时考虑过去和未来的信息:
import torch
import torch.nn as nn
bilstm = nn.LSTM(
input_size=10,
hidden_size=20,
num_layers=2,
batch_first=True,
bidirectional=True
)
x = torch.randn(32, 5, 10)
output, (hidden, cell) = bilstm(x)
print(f"双向 LSTM 输出形状: {output.shape}")
print(f" 注意: hidden_size * 2 = {20 * 2}")
print(f"隐藏状态形状: {hidden.shape}")
print(f" 注意: num_layers * 2 = {2 * 2}")
构建序列模型
文本分类模型
import torch
import torch.nn as nn
class TextClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.embedding(x)
lstm_out, (hidden, _) = self.lstm(embedded)
hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)
out = self.dropout(hidden_cat)
out = self.fc(out)
return out
vocab_size = 10000
embedding_dim = 128
hidden_size = 256
num_layers = 2
num_classes = 5
model = TextClassifier(vocab_size, embedding_dim, hidden_size, num_layers, num_classes)
x = torch.randint(0, vocab_size, (32, 50))
output = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
序列标注模型
import torch
import torch.nn as nn
class SequenceLabeling(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_tags, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(
input_size=embedding_dim,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0
)
self.fc = nn.Linear(hidden_size * 2, num_tags)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
embedded = self.embedding(x)
lstm_out, _ = self.lstm(embedded)
lstm_out = self.dropout(lstm_out)
out = self.fc(lstm_out)
return out
vocab_size = 10000
embedding_dim = 128
hidden_size = 256
num_layers = 2
num_tags = 5
model = SequenceLabeling(vocab_size, embedding_dim, hidden_size, num_layers, num_tags)
x = torch.randint(0, vocab_size, (32, 50))
output = model(x)
print(f"输入形状: {x.shape}")
print(f"输出形状: {output.shape}")
序列到序列模型
import torch
import torch.nn as nn
class Encoder(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
def forward(self, x):
embedded = self.embedding(x)
outputs, (hidden, cell) = self.lstm(embedded)
return outputs, hidden, cell
class Decoder(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_size, vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, hidden, cell):
embedded = self.dropout(self.embedding(x))
output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
output = self.fc(output)
return output, hidden, cell
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.size(0)
trg_len = trg.size(1)
trg_vocab_size = self.decoder.fc.out_features
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
_, hidden, cell = self.encoder(src)
input = trg[:, 0].unsqueeze(1)
for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)
teacher_force = torch.rand(1).item() < teacher_forcing_ratio
top1 = output.argmax(2)
input = trg[:, t].unsqueeze(1) if teacher_force else top1
return outputs
vocab_size = 10000
embedding_dim = 256
hidden_size = 512
num_layers = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(vocab_size, embedding_dim, hidden_size, num_layers)
decoder = Decoder(vocab_size, embedding_dim, hidden_size, num_layers)
seq2seq = Seq2Seq(encoder, decoder, device).to(device)
src = torch.randint(0, vocab_size, (32, 20)).to(device)
trg = torch.randint(0, vocab_size, (32, 15)).to(device)
output = seq2seq(src, trg)
print(f"Seq2Seq 输出形状: {output.shape}")
处理变长序列
使用 pack_padded_sequence
处理不等长序列时,使用打包序列提高效率:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
lstm = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)
batch_size = 4
max_seq_len = 6
input_size = 10
x = torch.randn(batch_size, max_seq_len, input_size)
lengths = torch.tensor([6, 4, 3, 1])
sorted_lengths, sorted_indices = torch.sort(lengths, descending=True)
sorted_x = x[sorted_indices]
packed_x = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)
packed_output, (hidden, cell) = lstm(packed_x)
output, output_lengths = pad_packed_sequence(packed_output, batch_first=True)
print(f"原始输入形状: {x.shape}")
print(f"序列长度: {lengths.tolist()}")
print(f"打包后输出形状: {output.shape}")
print(f"输出长度: {output_lengths.tolist()}")
完整的变长序列处理流程
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
class VarLenLSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers,
batch_first=True, bidirectional=True)
self.fc = nn.Linear(hidden_size * 2, num_classes)
def forward(self, x, lengths):
sorted_lengths, sorted_indices = torch.sort(lengths, descending=True)
sorted_x = x[sorted_indices]
packed_x = pack_padded_sequence(sorted_x, sorted_lengths.cpu(), batch_first=True)
packed_output, (hidden, _) = self.lstm(packed_x)
output, _ = pad_packed_sequence(packed_output, batch_first=True)
_, unsorted_indices = torch.sort(sorted_indices)
output = output[unsorted_indices]
hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)
hidden_cat = hidden_cat[unsorted_indices]
out = self.fc(hidden_cat)
return out
model = VarLenLSTM(input_size=10, hidden_size=20, num_layers=2, num_classes=5)
x = torch.randn(4, 6, 10)
lengths = torch.tensor([6, 4, 3, 1])
output = model(x, lengths)
print(f"输出形状: {output.shape}")
实战案例:情感分析
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch.nn.utils.rnn import pad_sequence
class SentimentDataset(Dataset):
def __init__(self, texts, labels, vocab, max_len=100):
self.texts = texts
self.labels = labels
self.vocab = vocab
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
tokens = text.lower().split()[:self.max_len]
indices = [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]
return torch.tensor(indices), torch.tensor(label)
def collate_fn(batch):
texts, labels = zip(*batch)
lengths = torch.tensor([len(t) for t in texts])
texts_padded = pad_sequence(texts, batch_first=True, padding_value=0)
labels = torch.stack(labels)
return texts_padded, lengths, labels
class SentimentLSTM(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_size, num_layers, num_classes, dropout=0.5):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers,
batch_first=True, bidirectional=True, dropout=dropout)
self.fc = nn.Linear(hidden_size * 2, num_classes)
self.dropout = nn.Dropout(dropout)
def forward(self, x, lengths):
embedded = self.embedding(x)
packed = pack_padded_sequence(embedded, lengths.cpu(),
batch_first=True, enforce_sorted=False)
packed_output, (hidden, _) = self.lstm(packed)
hidden_cat = torch.cat([hidden[-2], hidden[-1]], dim=1)
out = self.dropout(hidden_cat)
out = self.fc(out)
return out
vocab = {'<pad>': 0, '<unk>': 1}
for i, word in enumerate(['good', 'bad', 'great', 'terrible', 'excellent', 'poor', 'movie', 'film', 'acting', 'story'], 2):
vocab[word] = i
texts = [
"good movie great acting",
"terrible film poor story",
"excellent movie great story",
"bad acting poor film"
]
labels = [1, 0, 1, 0]
dataset = SentimentDataset(texts, labels, vocab)
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
vocab_size = len(vocab)
embedding_dim = 50
hidden_size = 64
num_layers = 2
num_classes = 2
model = SentimentLSTM(vocab_size, embedding_dim, hidden_size, num_layers, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
print("训练情感分析模型...")
for epoch in range(10):
model.train()
total_loss = 0
for texts, lengths, labels in dataloader:
optimizer.zero_grad()
outputs = model(texts, lengths)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 2 == 0:
print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")
print("\n模型训练完成!")
训练技巧
梯度裁剪
防止 RNN 训练中的梯度爆炸:
import torch
import torch.nn as nn
import torch.optim as optim
model = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
x = torch.randn(32, 10, 10)
y = torch.randn(32, 10, 20)
output, _ = model(x)
loss = criterion(output, y)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
print("梯度裁剪完成")
学习率调度
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
model = nn.LSTM(input_size=10, hidden_size=20, batch_first=True)
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
for epoch in range(100):
train_loss = train_one_epoch()
val_loss = validate()
scheduler.step(val_loss)
current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch {epoch}, LR: {current_lr}")
Teacher Forcing
在序列到序列训练中使用:
import torch
import torch.nn as nn
import random
def train_with_teacher_forcing(model, src, trg, criterion, optimizer, teacher_forcing_ratio=0.5):
model.train()
batch_size = src.size(0)
trg_len = trg.size(1)
trg_vocab_size = model.decoder.fc.out_features
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(src.device)
_, hidden, cell = model.encoder(src)
input = trg[:, 0].unsqueeze(1)
for t in range(1, trg_len):
output, hidden, cell = model.decoder(input, hidden, cell)
outputs[:, t] = output.squeeze(1)
use_teacher_forcing = random.random() < teacher_forcing_ratio
if use_teacher_forcing:
input = trg[:, t].unsqueeze(1)
else:
input = output.argmax(2)
loss = criterion(outputs[:, 1:].reshape(-1, trg_vocab_size),
trg[:, 1:].reshape(-1))
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
return loss.item()
常见问题
问题 1:梯度消失/爆炸
症状:训练不稳定,损失 NaN
解决方案:
- 使用 LSTM/GRU 替代 RNN
- 使用梯度裁剪
- 使用合适的权重初始化
- 使用 BatchNorm 或 LayerNorm
问题 2:训练速度慢
症状:RNN 训练时间过长
解决方案:
- 使用 pack_padded_sequence 处理变长序列
- 减少层数或隐藏单元数
- 使用 GRU 替代 LSTM
- 使用更大的 batch size
问题 3:过拟合
症状:训练准确率高,验证准确率低
解决方案:
- 增加 Dropout
- 使用权重衰减
- 增加训练数据
- 使用早停
小结
本章我们学习了:
- RNN 基础:序列数据处理和隐藏状态传递
- LSTM/GRU:解决长期依赖问题的门控机制
- 双向 RNN:同时利用过去和未来信息
- 序列模型:文本分类、序列标注、序列到序列
- 变长序列处理:pack_padded_sequence 的使用
- 训练技巧:梯度裁剪、学习率调度、Teacher Forcing