模型训练
本章将详细介绍 TensorFlow 中的模型训练流程,包括损失函数、优化器、训练循环、回调函数和模型评估。
训练流程概述
TensorFlow 的标准训练流程包含以下步骤:
┌─────────────────────────────────────────────────────────────┐
│ 训练流程 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 数据准备 → tf.data.Dataset │
│ ↓ │
│ 2. 模型创建 → keras.Model │
│ ↓ │
│ 3. 模型编译 → compile(optimizer, loss, metrics) │
│ ↓ │
│ 4. 模型训练 → fit() │
│ ↓ │
│ 5. 模型评估 → evaluate() │
│ ↓ │
│ 6. 模型预测 → predict() │
│ │
└─────────────────────────────────────────────────────────────┘
损失函数
损失函数衡量模型预测值与真实值之间的差异,是训练过程中优化的目标。
常用损失函数
import tensorflow as tf
from tensorflow import keras
# 回归任务损失函数
# 均方误差(MSE)- 回归任务最常用
mse_loss = keras.losses.MeanSquaredError()
y_true = tf.constant([1.0, 2.0, 3.0])
y_pred = tf.constant([1.1, 1.9, 3.2])
print(f"MSE: {mse_loss(y_true, y_pred)}") # 计算均方误差
# 平均绝对误差(MAE)- 对异常值更鲁棒
mae_loss = keras.losses.MeanAbsoluteError()
# Huber 损失 - 结合 MSE 和 MAE 的优点
huber_loss = keras.losses.Huber(delta=1.0)
# 分类任务损失函数
# 交叉熵损失(整数标签)
scce_loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
y_true_class = tf.constant([0, 1, 2]) # 整数标签
y_pred_prob = tf.constant([[0.9, 0.05, 0.05],
[0.05, 0.9, 0.05],
[0.05, 0.05, 0.9]]) # 概率分布
# 二分类交叉熵
bce_loss = keras.losses.BinaryCrossentropy()
# 分类交叉熵(one-hot 编码标签)
cce_loss = keras.losses.CategoricalCrossentropy()
损失函数选择指南
| 任务类型 | 推荐损失函数 | 说明 |
|---|---|---|
| 二分类 | BinaryCrossentropy | 输出层使用 sigmoid |
| 多分类(整数标签) | SparseCategoricalCrossentropy | 标签为整数 |
| 多分类(one-hot) | CategoricalCrossentropy | 标签为 one-hot |
| 回归 | MSE / MAE | 连续值预测 |
| 回归(有异常值) | Huber / MAE | 对异常值鲁棒 |
自定义损失函数
import tensorflow as tf
def custom_mse(y_true, y_pred):
"""自定义均方误差损失函数"""
return tf.reduce_mean(tf.square(y_true - y_pred))
def weighted_mse(weight):
"""带权重的均方误差"""
def loss(y_true, y_pred):
return tf.reduce_mean(weight * tf.square(y_true - y_pred))
return loss
model.compile(
optimizer='adam',
loss=custom_mse # 使用自定义损失函数
)
model.compile(
optimizer='adam',
loss=weighted_mse(2.0) # 使用带参数的自定义损失
)
优化器
优化器负责根据梯度更新模型参数,选择合适的优化器对训练效果至关重要。
常用优化器
import tensorflow as tf
from tensorflow import keras
# SGD(随机梯度下降)
sgd = keras.optimizers.SGD(
learning_rate=0.01, # 学习率
momentum=0.9, # 动量
nesterov=False, # 是否使用 Nesterov 动量
weight_decay=None # 权重衰减(L2 正则化)
)
# Adam(自适应矩估计)- 最常用的优化器
adam = keras.optimizers.Adam(
learning_rate=0.001, # 学习率
beta_1=0.9, # 一阶矩估计的指数衰减率
beta_2=0.999, # 二阶矩估计的指数衰减率
epsilon=1e-07, # 数值稳定性
amsgrad=False # 是否使用 AMSGrad 变体
)
# AdamW(带权重衰减的 Adam)
adamw = keras.optimizers.AdamW(
learning_rate=0.001,
weight_decay=0.01 # 权重衰减系数
)
# RMSprop
rmsprop = keras.optimizers.RMSprop(
learning_rate=0.001,
rho=0.9, # 梯度平方的移动平均衰减率
momentum=0.0,
epsilon=1e-07
)
# Adagrad - 适合稀疏数据
adagrad = keras.optimizers.Adagrad(
learning_rate=0.001,
initial_accumulator_value=0.1,
epsilon=1e-07
)
# Adamax - Adam 的变体
adamax = keras.optimizers.Adamax(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-07
)
# Nadam - Nesterov + Adam
nadam = keras.optimizers.Nadam(
learning_rate=0.001,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-07
)
优化器对比
| 优化器 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| SGD | 稳定、可解释性强 | 收敛慢、需调参 | 学术研究、简单模型 |
| Adam | 收敛快、自适应学习率 | 可能泛化性能差 | 通用、快速原型 |
| AdamW | 收敛快、泛化好 | - | Transformer、大模型 |
| RMSprop | 适合 RNN | 需调参 | 循环神经网络 |
学习率调度
动态调整学习率可以提高训练效果:
import tensorflow as tf
from tensorflow import keras
# 指数衰减
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=0.1, # 初始学习率
decay_steps=10000, # 衰减步数
decay_rate=0.96, # 衰减率
staircase=False # 是否阶梯式衰减
)
optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
# 分段常数衰减
lr_schedule = keras.optimizers.schedules.PiecewiseConstantDecay(
boundaries=[10000, 20000], # 边界步数
values=[0.1, 0.01, 0.001] # 各段学习率
)
# 余弦衰减
lr_schedule = keras.optimizers.schedules.CosineDecay(
initial_learning_rate=0.1,
decay_steps=1000,
alpha=0.0 # 最小学习率比例
)
# 余弦衰减重启
lr_schedule = keras.optimizers.schedules.CosineDecayRestarts(
initial_learning_rate=0.1,
first_decay_steps=1000,
t_mul=2.0, # 周期增长因子
m_mul=1.0, # 学习率衰减因子
alpha=0.0
)
# 使用学习率调度器
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=lr_schedule),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
模型编译
使用 compile() 方法配置模型的学习过程:
import tensorflow as tf
from tensorflow import keras
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(784,)),
keras.layers.Dropout(0.2),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer='adam', # 优化器
loss='sparse_categorical_crossentropy', # 损失函数
metrics=['accuracy'] # 评估指标
)
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=False),
metrics=[
'accuracy',
keras.metrics.SparseTopKCategoricalAccuracy(k=5)
]
)
常用评估指标
import tensorflow as tf
from tensorflow import keras
metrics = [
'accuracy', # 准确率
keras.metrics.Precision(), # 精确率
keras.metrics.Recall(), # 召回率
keras.metrics.AUC(), # AUC 值
keras.metrics.SparseTopKCategoricalAccuracy(k=5), # Top-5 准确率
keras.metrics.MeanAbsoluteError(), # 平均绝对误差
keras.metrics.RootMeanSquaredError(), # 均方根误差
]
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=metrics
)
模型训练
使用 fit 方法
fit() 方法是 Keras 中最常用的训练方式:
import tensorflow as tf
from tensorflow import keras
import numpy as np
x_train = np.random.random((1000, 784))
y_train = np.random.randint(10, size=(1000,))
x_val = np.random.random((200, 784))
y_val = np.random.randint(10, size=(200,))
history = model.fit(
x_train, # 训练数据
y_train, # 训练标签
epochs=10, # 训练轮数
batch_size=32, # 批量大小
validation_data=(x_val, y_val), # 验证数据
verbose=1 # 日志显示模式:0=静默,1=进度条,2=每轮一行
)
print(f"训练历史: {history.history.keys()}")
使用验证集分割
history = model.fit(
x_train,
y_train,
epochs=10,
batch_size=32,
validation_split=0.2, # 从训练数据中分割 20% 作为验证集
shuffle=True # 每轮训练前打乱数据
)
使用 tf.data.Dataset
对于大规模数据,推荐使用 tf.data.Dataset:
import tensorflow as tf
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(32).prefetch(tf.data.AUTOTUNE)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(32)
history = model.fit(
train_dataset,
epochs=10,
validation_data=val_dataset
)
训练历史可视化
import matplotlib.pyplot as plt
history = model.fit(x_train, y_train, epochs=20, validation_split=0.2)
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='val')
plt.title('Loss')
plt.xlabel('Epoch')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='val')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.show()
回调函数
回调函数可以在训练过程中执行自定义操作,如保存模型、调整学习率等。
常用回调函数
import tensorflow as tf
from tensorflow import keras
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss', # 监控指标
patience=10, # 允许没有改善的轮数
restore_best_weights=True, # 恢复最佳权重
verbose=1
),
keras.callbacks.ModelCheckpoint(
filepath='best_model.keras', # 保存路径
monitor='val_loss', # 监控指标
save_best_only=True, # 只保存最佳模型
save_weights_only=False, # 是否只保存权重
verbose=1
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss', # 监控指标
factor=0.5, # 学习率衰减因子
patience=5, # 允许没有改善的轮数
min_lr=1e-7, # 最小学习率
verbose=1
),
keras.callbacks.TensorBoard(
log_dir='./logs', # 日志目录
histogram_freq=1, # 直方图记录频率
write_graph=True, # 是否记录计算图
update_freq='epoch' # 更新频率
),
keras.callbacks.CSVLogger(
filename='training_log.csv', # 日志文件
separator=',',
append=False
)
]
history = model.fit(
x_train, y_train,
epochs=100,
batch_size=32,
validation_split=0.2,
callbacks=callbacks
)
自定义回调函数
import tensorflow as tf
from tensorflow import keras
import numpy as np
class CustomCallback(keras.callbacks.Callback):
"""自定义回调函数示例"""
def on_train_begin(self, logs=None):
print("训练开始!")
def on_train_end(self, logs=None):
print("训练结束!")
def on_epoch_begin(self, epoch, logs=None):
print(f"\n第 {epoch + 1} 轮训练开始")
def on_epoch_end(self, epoch, logs=None):
print(f"第 {epoch + 1} 轮训练结束")
print(f"训练损失: {logs['loss']:.4f}")
print(f"验证损失: {logs['val_loss']:.4f}")
def on_batch_begin(self, batch, logs=None):
pass
def on_batch_end(self, batch, logs=None):
if batch % 100 == 0:
print(f" Batch {batch}: loss = {logs['loss']:.4f}")
class LearningRateLogger(keras.callbacks.Callback):
"""学习率记录器"""
def on_epoch_end(self, epoch, logs=None):
lr = self.model.optimizer.learning_rate
if isinstance(lr, keras.optimizers.schedules.LearningRateSchedule):
step = self.model.optimizer.iterations
lr = lr(step)
print(f" 学习率: {float(lr):.6f}")
history = model.fit(
x_train, y_train,
epochs=10,
validation_split=0.2,
callbacks=[CustomCallback(), LearningRateLogger()]
)
自定义训练循环
对于更复杂的训练逻辑,可以使用自定义训练循环:
基本自定义训练
import tensorflow as tf
from tensorflow import keras
import numpy as np
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(784,)),
keras.layers.Dropout(0.2),
keras.layers.Dense(10)
])
optimizer = keras.optimizers.Adam(learning_rate=0.001)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
batch_size = 32
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
epochs = 10
for epoch in range(epochs):
print(f"\n第 {epoch + 1} 轮训练")
# 训练阶段
for step, (x_batch, y_batch) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch, training=True)
loss_value = loss_fn(y_batch, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y_batch, logits)
if step % 100 == 0:
print(f" Step {step}: loss = {loss_value:.4f}")
train_acc = train_acc_metric.result()
print(f" 训练准确率: {train_acc:.4f}")
train_acc_metric.reset_states()
# 验证阶段
for x_batch, y_batch in val_dataset:
val_logits = model(x_batch, training=False)
val_acc_metric.update_state(y_batch, val_logits)
val_acc = val_acc_metric.result()
print(f" 验证准确率: {val_acc:.4f}")
val_acc_metric.reset_states()
使用 @tf.function 加速
import tensorflow as tf
from tensorflow import keras
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(784,)),
keras.layers.Dense(10)
])
optimizer = keras.optimizers.Adam(learning_rate=0.001)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
@tf.function
def train_step(x, y):
"""单个训练步骤"""
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
loss_value += sum(model.losses) # 添加正则化损失
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
return loss_value
@tf.function
def test_step(x, y):
"""单个测试步骤"""
logits = model(x, training=False)
loss_value = loss_fn(y, logits)
return loss_value
epochs = 10
for epoch in range(epochs):
print(f"\n第 {epoch + 1} 轮训练")
for step, (x_batch, y_batch) in enumerate(train_dataset):
loss = train_step(x_batch, y_batch)
if step % 100 == 0:
print(f" Step {step}: loss = {loss:.4f}")
模型评估与预测
模型评估
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=2)
print(f"\n测试损失: {test_loss:.4f}")
print(f"测试准确率: {test_acc:.4f}")
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
test_dataset = test_dataset.batch(32)
test_loss, test_acc = model.evaluate(test_dataset, verbose=2)
模型预测
predictions = model.predict(x_test)
print(f"预测形状: {predictions.shape}")
predicted_classes = np.argmax(predictions, axis=1)
print(f"预测类别: {predicted_classes[:10]}")
predictions = model.predict(test_dataset)
训练技巧
梯度裁剪
防止梯度爆炸:
import tensorflow as tf
from tensorflow import keras
optimizer = keras.optimizers.Adam(learning_rate=0.001)
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = keras.losses.sparse_categorical_crossentropy(y, logits, from_logits=True)
loss_value = tf.reduce_mean(loss_value)
grads = tape.gradient(loss_value, model.trainable_weights)
grads, _ = tf.clip_by_global_norm(grads, clip_norm=1.0)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
return loss_value
混合精度训练
使用 FP16 加速训练:
import tensorflow as tf
from tensorflow import keras
keras.mixed_precision.set_global_policy('mixed_float16')
model = keras.Sequential([
keras.layers.Dense(128, activation='relu', input_shape=(784,)),
keras.layers.Dense(10, dtype='float32')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
history = model.fit(x_train, y_train, epochs=10, validation_split=0.2)
类别权重
处理类别不平衡:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
class_weights = compute_class_weight(
class_weight='balanced',
classes=np.unique(y_train),
y=y_train
)
class_weight_dict = dict(enumerate(class_weights))
print(f"类别权重: {class_weight_dict}")
history = model.fit(
x_train, y_train,
epochs=10,
batch_size=32,
validation_split=0.2,
class_weight=class_weight_dict
)
样本权重
为不同样本分配不同权重:
sample_weight = np.ones(shape=(len(y_train),))
sample_weight[y_train == 0] = 2.0 # 类别 0 的样本权重加倍
history = model.fit(
x_train, y_train,
sample_weight=sample_weight,
epochs=10,
batch_size=32,
validation_split=0.2
)
完整训练示例
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
model = keras.Sequential([
keras.layers.Input(shape=(784,)),
keras.layers.Dense(512, activation='relu'),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.3),
keras.layers.Dense(256, activation='relu'),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.3),
keras.layers.Dense(10, activation='softmax')
])
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True,
verbose=1
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=5,
min_lr=1e-6,
verbose=1
),
keras.callbacks.ModelCheckpoint(
'best_mnist_model.keras',
monitor='val_accuracy',
save_best_only=True,
verbose=1
)
]
history = model.fit(
x_train, y_train,
epochs=50,
batch_size=128,
validation_data=(x_val, y_val),
callbacks=callbacks,
verbose=1
)
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history.history['loss'], label='Train')
axes[0].plot(history.history['val_loss'], label='Validation')
axes[0].set_title('Loss')
axes[0].set_xlabel('Epoch')
axes[0].legend()
axes[1].plot(history.history['accuracy'], label='Train')
axes[1].plot(history.history['val_accuracy'], label='Validation')
axes[1].set_title('Accuracy')
axes[1].set_xlabel('Epoch')
axes[1].legend()
plt.tight_layout()
plt.show()
test_loss, test_acc = model.evaluate(x_test, y_test, verbose=0)
print(f"\n测试集损失: {test_loss:.4f}")
print(f"测试集准确率: {test_acc:.4f}")
predictions = model.predict(x_test[:5])
predicted_classes = np.argmax(predictions, axis=1)
print(f"\n前 5 个样本预测: {predicted_classes}")
print(f"前 5 个样本真实: {y_test[:5]}")
小结
本章我们学习了:
- 损失函数:MSE、交叉熵等常用损失函数及其选择
- 优化器:SGD、Adam、AdamW 等优化器的特点和使用
- 学习率调度:指数衰减、余弦衰减等动态调整策略
- 模型训练:使用 fit 方法和自定义训练循环
- 回调函数:EarlyStopping、ModelCheckpoint 等常用回调
- 训练技巧:梯度裁剪、混合精度、类别权重等
练习
- 使用不同的优化器训练同一个模型,比较收敛速度和最终效果
- 实现一个自定义的学习率调度器
- 使用自定义训练循环实现梯度累积
- 在不平衡数据集上使用类别权重训练模型