循环神经网络 (RNN/LSTM)
循环神经网络(Recurrent Neural Network,RNN)是处理序列数据的核心架构。本章介绍 RNN 的基本原理以及 LSTM、GRU 等改进模型,并通过实际案例展示如何在 TensorFlow 中使用它们。
序列数据与 RNN 原理
什么是序列数据
序列数据是指具有时间或顺序依赖关系的数据,每个数据点的位置很重要:
- 文本:单词的顺序决定语义
- 语音:音频信号的时序特征
- 时间序列:股票价格、天气数据、传感器读数
- 视频:连续的图像帧
与传统神经网络不同,RNN 具有"记忆"能力,能够利用历史信息处理当前输入。
RNN 的基本结构
RNN 的核心思想是在处理序列时维护一个隐藏状态(hidden state),这个状态会随着序列的推进不断更新:
import tensorflow as tf
from tensorflow.keras import layers
# 简单 RNN 层
rnn_layer = layers.SimpleRNN(
units=32, # 隐藏单元数量
return_sequences=False, # 是否返回完整序列
return_state=False, # 是否返回最终状态
input_shape=(10, 8) # (时间步数, 特征维度)
)
# 输入形状: (batch_size, timesteps, features)
inputs = tf.random.normal([4, 10, 8]) # 4个样本,每个10步,每步8个特征
outputs = rnn_layer(inputs)
print(f"输出形状: {outputs.shape}") # (4, 32)
RNN 的计算过程可以用以下公式表示:
其中 是当前隐藏状态, 是前一时刻隐藏状态, 是当前输入。
RNN 的局限性
简单 RNN 存在梯度消失问题,难以学习长距离依赖。考虑一个长序列,梯度需要通过时间步逐层反向传播,当序列很长时,梯度会指数级衰减,导致网络无法有效学习早期信息。
# 演示梯度消失
import numpy as np
def rnn_gradient_demo():
# 假设每步梯度乘以 0.5
grad = 1.0
for t in range(20):
grad = grad * 0.5
print(f"第 {t+1} 步后的梯度: {grad:.6f}")
rnn_gradient_demo()
# 第 10 步后梯度已经非常小
# 这就是为什么简单 RNN 难以学习长距离依赖
LSTM 长短时记忆网络
LSTM(Long Short-Term Memory)通过门控机制解决了梯度消失问题,能够学习长距离依赖关系。
LSTM 的门控机制
LSTM 引入了三个门:遗忘门、输入门和输出门,以及一个细胞状态(cell state):
# LSTM 层
lstm_layer = layers.LSTM(
units=64,
return_sequences=True, # 返回完整序列
return_state=True, # 返回最终状态
input_shape=(None, 32) # 时间步可变
)
# 输入
inputs = tf.random.normal([2, 10, 32])
# 获取输出和状态
outputs, final_state_h, final_state_c = lstm_layer(inputs)
print(f"输出形状: {outputs.shape}") # (2, 10, 64)
print(f"最终隐藏状态: {final_state_h.shape}") # (2, 64)
print(f"最终细胞状态: {final_state_c.shape}") # (2, 64)
LSTM 的计算过程:
-
遗忘门:决定从细胞状态中丢弃哪些信息
-
输入门:决定哪些新信息将被存储
-
更新细胞状态:
-
输出门:决定输出什么
return_sequences 参数
return_sequences 决定输出是只返回最后时刻的结果,还是返回完整序列:
# 只返回最后时刻的输出
lstm_last = layers.LSTM(64, return_sequences=False)
inputs = tf.random.normal([2, 10, 32])
output = lstm_last(inputs)
print(f"最后时刻输出: {output.shape}") # (2, 64)
# 返回完整序列
lstm_seq = layers.LSTM(64, return_sequences=True)
output_seq = lstm_seq(inputs)
print(f"完整序列输出: {output_seq.shape}") # (2, 10, 64)
# 堆叠 LSTM 时,前面的层需要 return_sequences=True
model = tf.keras.Sequential([
layers.LSTM(64, return_sequences=True, input_shape=(10, 32)),
layers.LSTM(32, return_sequences=False),
layers.Dense(10, activation='softmax')
])
GRU 门控循环单元
GRU(Gated Recurrent Unit)是 LSTM 的简化版本,参数更少,计算更快:
# GRU 层
gru_layer = layers.GRU(
units=64,
return_sequences=True,
input_shape=(None, 32)
)
inputs = tf.random.normal([2, 10, 32])
outputs = gru_layer(inputs)
print(f"GRU 输出形状: {outputs.shape}") # (2, 10, 64)
GRU 只有重置门和更新门,结构更简单:
# 比较 LSTM 和 GRU 的参数量
lstm_model = tf.keras.Sequential([
layers.LSTM(64, input_shape=(10, 32))
])
gru_model = tf.keras.Sequential([
layers.GRU(64, input_shape=(10, 32))
])
print(f"LSTM 参数量: {lstm_model.count_params()}")
print(f"GRU 参数量: {gru_model.count_params()}")
# GRU 参数量约为 LSTM 的 3/4
双向 RNN
双向 RNN 同时考虑过去和未来的上下文信息:
# 双向 LSTM
model = tf.keras.Sequential([
layers.Bidirectional(
layers.LSTM(64, return_sequences=True),
input_shape=(10, 32)
),
layers.Bidirectional(layers.LSTM(32)),
layers.Dense(10, activation='softmax')
])
# 输出维度会翻倍(前向 + 后向)
inputs = tf.random.normal([2, 10, 32])
outputs = model(inputs)
print(f"双向 LSTM 输出: {outputs.shape}")
双向 RNN 特别适合需要完整上下文的任务,如文本分类、命名实体识别等。
文本分类实战
下面使用 LSTM 实现一个情感分类模型:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
# 准备示例数据
texts = [
"这部电影非常好看,剧情精彩",
"服务态度太差了,再也不来了",
"产品质量很好,性价比高",
"完全是浪费钱,太失望了",
"非常满意这次购物体验",
"物流太慢了,等了很久",
"客服很专业,解答很详细",
"东西坏了,客服也不处理",
]
labels = np.array([1, 0, 1, 0, 1, 0, 1, 0]) # 1: 正面, 0: 负面
# 文本向量化
vectorize_layer = layers.TextVectorization(
max_tokens=1000,
output_mode='int',
output_sequence_length=20
)
vectorize_layer.adapt(texts)
# 构建模型
model = models.Sequential([
layers.Input(shape=(1,), dtype=tf.string),
vectorize_layer,
layers.Embedding(input_dim=1000, output_dim=64),
layers.Bidirectional(layers.LSTM(64)),
layers.Dense(64, activation='relu'),
layers.Dropout(0.5),
layers.Dense(1, activation='sigmoid')
])
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# 训练
model.fit(
tf.constant(texts),
labels,
epochs=10,
batch_size=2,
verbose=1
)
# 预测
test_texts = ["这个产品很好用", "太差了"]
predictions = model.predict(tf.constant(test_texts))
for text, pred in zip(test_texts, predictions):
sentiment = "正面" if pred > 0.5 else "负面"
print(f"'{text}' -> {sentiment} (置信度: {pred[0]:.2f})")
时间序列预测
使用 LSTM 预测时间序列数据:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
# 生成模拟时间序列数据
def generate_time_series(n_steps, n_samples):
freq1, freq2, offsets1, offsets2 = np.random.rand(4, n_samples, 1)
time = np.linspace(0, 1, n_steps)
series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))
series += 0.3 * np.sin((time - offsets2) * (freq2 * 20 + 20))
series += 0.1 * (np.random.rand(n_samples, n_steps) - 0.5)
return series[..., np.newaxis].astype(np.float32)
# 生成数据
n_steps = 50
n_samples = 1000
series = generate_time_series(n_steps, n_samples)
# 划分训练集和测试集
X_train, y_train = series[:700, :n_steps-1], series[:700, -1]
X_valid, y_valid = series[700:, :n_steps-1], series[700:, -1]
print(f"训练集形状: {X_train.shape}") # (700, 49, 1)
print(f"标签形状: {y_train.shape}") # (700, 1)
# 构建 LSTM 模型
model = models.Sequential([
layers.LSTM(50, return_sequences=True, input_shape=[None, 1]),
layers.LSTM(20),
layers.Dense(1)
])
model.compile(optimizer='adam', loss='mse')
# 训练
history = model.fit(
X_train, y_train,
epochs=20,
batch_size=32,
validation_data=(X_valid, y_valid),
verbose=1
)
# 预测
y_pred = model.predict(X_valid)
# 可视化结果
plt.figure(figsize=(12, 4))
plt.plot(y_valid[:50], label='真实值')
plt.plot(y_pred[:50], label='预测值')
plt.legend()
plt.title('时间序列预测结果')
plt.show()
多步预测
预测未来多个时间点:
def create_multistep_dataset(series, n_steps_in, n_steps_out):
X, y = [], []
for i in range(len(series) - n_steps_in - n_steps_out + 1):
X.append(series[i:i+n_steps_in])
y.append(series[i+n_steps_in:i+n_steps_in+n_steps_out])
return np.array(X), np.array(y)
# 生成数据
series = generate_time_series(100, 1).squeeze()
X, y = create_multistep_dataset(series, n_steps_in=50, n_steps_out=10)
# 构建模型
model = models.Sequential([
layers.LSTM(50, activation='relu', input_shape=(50, 1)),
layers.RepeatVector(10), # 重复编码器输出
layers.LSTM(50, activation='relu', return_sequences=True),
layers.TimeDistributed(layers.Dense(1))
])
model.compile(optimizer='adam', loss='mse')
文本生成
使用 LSTM 生成文本:
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
import requests
# 下载文本数据
url = "https://www.gutenberg.org/files/11/11-0.txt"
text = requests.get(url).text
# 只取部分文本进行演示
text = text[:50000].lower()
# 创建字符到索引的映射
chars = sorted(set(text))
char_to_idx = {char: idx for idx, char in enumerate(chars)}
idx_to_char = {idx: char for idx, char in enumerate(chars)}
print(f"字符数量: {len(chars)}")
print(f"文本长度: {len(text)}")
# 创建训练序列
seq_length = 100
step = 3
sequences = []
next_chars = []
for i in range(0, len(text) - seq_length, step):
sequences.append(text[i:i+seq_length])
next_chars.append(text[i+seq_length])
print(f"训练序列数量: {len(sequences)}")
# 向量化
X = np.zeros((len(sequences), seq_length, len(chars)), dtype=np.bool_)
y = np.zeros((len(sequences), len(chars)), dtype=np.bool_)
for i, sequence in enumerate(sequences):
for t, char in enumerate(sequence):
X[i, t, char_to_idx[char]] = 1
y[i, char_to_idx[next_chars[i]]] = 1
# 构建模型
model = models.Sequential([
layers.LSTM(128, input_shape=(seq_length, len(chars))),
layers.Dense(len(chars), activation='softmax')
])
model.compile(optimizer='adam', loss='categorical_crossentropy')
# 训练
model.fit(X, y, batch_size=128, epochs=10)
# 文本生成函数
def generate_text(model, seed_text, length=200, temperature=1.0):
generated = seed_text
for _ in range(length):
# 准备输入
x = np.zeros((1, seq_length, len(chars)))
for t, char in enumerate(seed_text):
x[0, t, char_to_idx[char]] = 1
# 预测下一个字符的概率分布
predictions = model.predict(x, verbose=0)[0]
# 应用温度参数
predictions = np.log(predictions + 1e-10) / temperature
exp_predictions = np.exp(predictions)
predictions = exp_predictions / np.sum(exp_predictions)
# 采样
next_idx = np.random.choice(len(chars), p=predictions)
next_char = idx_to_char[next_idx]
generated += next_char
seed_text = seed_text[1:] + next_char
return generated
# 生成文本
seed = text[1000:1100]
print("种子文本:", seed)
print("\n生成文本:")
print(generate_text(model, seed, length=300, temperature=0.5))
序列到序列模型 (Seq2Seq)
Seq2Seq 模型用于机器翻译、文本摘要等任务:
import tensorflow as tf
from tensorflow.keras import layers, models
# 假设的翻译数据
encoder_input_data = np.random.randint(0, 100, size=(1000, 20)) # 源语言
decoder_input_data = np.random.randint(0, 100, size=(1000, 15)) # 目标语言输入
decoder_target_data = np.random.randint(0, 100, size=(1000, 15)) # 目标语言输出
vocab_size = 100
embedding_dim = 64
latent_dim = 128
# 编码器
encoder_inputs = layers.Input(shape=(None,))
encoder_embedding = layers.Embedding(vocab_size, embedding_dim)(encoder_inputs)
encoder_lstm = layers.LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)
encoder_states = [state_h, state_c]
# 解码器
decoder_inputs = layers.Input(shape=(None,))
decoder_embedding = layers.Embedding(vocab_size, embedding_dim)(decoder_inputs)
decoder_lstm = layers.LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)
decoder_dense = layers.Dense(vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)
# 完整模型
model = models.Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
# 训练
model.fit(
[encoder_input_data, decoder_input_data],
decoder_target_data,
batch_size=64,
epochs=10
)
# 推理模型(用于预测)
# 编码器推理模型
encoder_model = models.Model(encoder_inputs, encoder_states)
# 解码器推理模型
decoder_state_input_h = layers.Input(shape=(latent_dim,))
decoder_state_input_c = layers.Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_lstm(
decoder_embedding, initial_state=decoder_states_inputs
)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = models.Model(
[decoder_inputs] + decoder_states_inputs,
[decoder_outputs] + decoder_states
)
def decode_sequence(input_seq):
# 编码输入
states_value = encoder_model.predict(input_seq, verbose=0)
# 生成起始标记
target_seq = np.zeros((1, 1))
target_seq[0, 0] = 1 # 假设 1 是起始标记
decoded_sentence = []
stop_condition = False
while not stop_condition:
output_tokens, h, c = decoder_model.predict(
[target_seq] + states_value, verbose=0
)
# 采样
sampled_token_index = np.argmax(output_tokens[0, -1, :])
decoded_sentence.append(sampled_token_index)
if len(decoded_sentence) > 15 or sampled_token_index == 2: # 2 是结束标记
stop_condition = True
# 更新
target_seq = np.zeros((1, 1))
target_seq[0, 0] = sampled_token_index
states_value = [h, c]
return decoded_sentence
注意力机制
注意力机制可以让模型关注序列的不同部分:
class AttentionLayer(layers.Layer):
def __init__(self, units):
super(AttentionLayer, self).__init__()
self.W1 = layers.Dense(units)
self.W2 = layers.Dense(units)
self.V = layers.Dense(1)
def call(self, query, values):
# query: (batch_size, hidden_size)
# values: (batch_size, max_len, hidden_size)
# 扩展 query 维度
query_with_time_axis = tf.expand_dims(query, 1)
# 计算注意力分数
score = self.V(tf.nn.tanh(
self.W1(query_with_time_axis) + self.W2(values)
))
# 注意力权重
attention_weights = tf.nn.softmax(score, axis=1)
# 上下文向量
context_vector = attention_weights * values
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
# 使用注意力机制的模型
class AttentionModel(models.Model):
def __init__(self, vocab_size, embedding_dim, hidden_dim):
super(AttentionModel, self).__init__()
self.embedding = layers.Embedding(vocab_size, embedding_dim)
self.lstm = layers.LSTM(hidden_dim, return_sequences=True, return_state=True)
self.attention = AttentionLayer(hidden_dim)
self.dense = layers.Dense(vocab_size, activation='softmax')
def call(self, inputs):
x = self.embedding(inputs)
lstm_output, state_h, state_c = self.lstm(x)
context_vector, attention_weights = self.attention(state_h, lstm_output)
output = self.dense(context_vector)
return output, attention_weights
最佳实践
处理变长序列
# 使用 masking 处理变长序列
model = models.Sequential([
layers.Embedding(1000, 64, mask_zero=True), # 0 作为填充值
layers.LSTM(64),
layers.Dense(10, activation='softmax')
])
# 或者在 LSTM 中设置
lstm_layer = layers.LSTM(64, mask_zero=True)
梯度裁剪
防止梯度爆炸:
optimizer = tf.keras.optimizers.Adam(clipnorm=1.0) # 按范数裁剪
optimizer = tf.keras.optimizers.Adam(clipvalue=0.5) # 按值裁剪
model.compile(optimizer=optimizer, loss='mse')
使用 CuDNN 加速
如果使用 GPU,可以使用 CuDNN 优化版本:
# CuDNN 优化的 LSTM
lstm_layer = layers.LSTM(
64,
activation='tanh',
recurrent_activation='sigmoid',
use_bias=True,
recurrent_dropout=0, # CuDNN 不支持 recurrent_dropout
unroll=False, # CuDNN 不支持 unroll
use_cudnn='auto' # 自动选择
)
小结
本章介绍了循环神经网络的核心内容:
- RNN 基础:理解序列数据和 RNN 的基本原理
- LSTM:门控机制解决长距离依赖问题
- GRU:LSTM 的简化版本,参数更少
- 双向 RNN:同时利用过去和未来信息
- 实际应用:文本分类、时间序列预测、文本生成、Seq2Seq
- 注意力机制:让模型关注重要信息
RNN/LSTM 是处理序列数据的强大工具,但在某些任务中已被 Transformer 架构超越。不过,对于中小规模序列数据,LSTM 仍然是一个高效的选择。下一章我们将学习迁移学习,利用预训练模型加速开发。