迁移学习
迁移学习是深度学习中最实用的技术之一,它允许我们利用在大规模数据集上预训练的模型,快速构建高性能的应用。本章介绍如何在 TensorFlow 中使用迁移学习。
迁移学习概述
什么是迁移学习
迁移学习是指将一个任务上学到的知识迁移到另一个相关任务上。在深度学习中,这通常意味着:
- 使用在大规模数据集(如 ImageNet)上预训练的模型
- 根据新任务的需求,调整模型的部分或全部参数
迁移学习的核心思想是:神经网络的前几层学习的是通用特征(如边缘、纹理),这些特征对许多任务都有用;后几层学习的是任务特定的特征。
迁移学习的优势
- 减少训练数据需求:不需要从头训练,小数据集也能获得好效果
- 加快训练速度:预训练权重提供了良好的初始化
- 提高模型性能:预训练模型已经学到了丰富的特征表示
- 降低计算成本:不需要大规模计算资源
适用场景
迁移学习特别适合以下场景:
- 训练数据有限(几百到几千张图片)
- 新任务与预训练任务相关(如都是图像分类)
- 需要快速原型开发
- 计算资源受限
使用预训练模型
TensorFlow 提供了多种预训练模型,可以通过 tf.keras.applications 访问。
可用的预训练模型
import tensorflow as tf
from tensorflow.keras import applications
# 图像分类模型
models_dict = {
'ResNet': applications.ResNet50V2,
'EfficientNet': applications.EfficientNetB0,
'MobileNet': applications.MobileNetV3Small,
'VGG': applications.VGG16,
'DenseNet': applications.DenseNet121,
'Inception': applications.InceptionV3,
'Xception': applications.Xception,
}
# 查看模型参数
for name, model_class in models_dict.items():
model = model_class(weights=None)
params = model.count_params()
print(f"{name}: {params:,} 参数")
加载预训练模型
import tensorflow as tf
from tensorflow.keras import applications, layers, models
# 加载预训练的 ResNet50V2
base_model = applications.ResNet50V2(
weights='imagenet', # 使用 ImageNet 预训练权重
include_top=False, # 不包含顶层分类器
input_shape=(224, 224, 3) # 输入形状
)
# 查看模型结构
base_model.summary()
# 查看输入输出形状
print(f"输入形状: {base_model.input_shape}")
print(f"输出形状: {base_model.output_shape}")
模型选择指南
选择预训练模型时需要考虑以下因素:
| 模型 | 参数量 | 准确率 | 推理速度 | 适用场景 |
|---|---|---|---|---|
| MobileNetV3 | ~2.5M | 中等 | 快 | 移动端、嵌入式 |
| EfficientNetB0 | ~5M | 高 | 中等 | 通用场景 |
| ResNet50V2 | ~25M | 高 | 中等 | 通用场景 |
| VGG16 | ~138M | 中等 | 慢 | 特征提取 |
| DenseNet121 | ~8M | 高 | 中等 | 医学图像 |
特征提取
特征提取是最简单的迁移学习方式,冻结预训练模型的所有层,只训练新添加的分类层。
基本流程
import tensorflow as tf
from tensorflow.keras import applications, layers, models
# 1. 加载预训练模型(不包含顶层)
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 2. 冻结预训练模型的权重
base_model.trainable = False
# 3. 添加自定义分类层
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = models.Model(inputs, outputs)
# 4. 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
model.summary()
完整示例:猫狗分类
import tensorflow as tf
from tensorflow.keras import applications, layers, models
import matplotlib.pyplot as plt
import numpy as np
# 下载猫狗数据集(示例使用 flowers 数据集代替)
import pathlib
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
data_dir = tf.keras.utils.get_file('flower_photos', origin=dataset_url, untar=True)
data_dir = pathlib.Path(data_dir)
# 创建数据集
batch_size = 32
img_height = 224
img_width = 224
train_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=42,
image_size=(img_height, img_width),
batch_size=batch_size
)
val_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=42,
image_size=(img_height, img_width),
batch_size=batch_size
)
# 获取类别名称
class_names = train_ds.class_names
num_classes = len(class_names)
print(f"类别: {class_names}")
# 数据预处理
def preprocess(image, label):
# EfficientNet 需要的预处理
image = applications.efficientnet.preprocess_input(image)
return image, label
train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# 性能优化
train_ds = train_ds.cache().shuffle(1000).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)
# 构建模型
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
base_model.trainable = False
model = models.Sequential([
layers.Input(shape=(224, 224, 3)),
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# 训练
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=10
)
# 绘制训练曲线
def plot_history(history):
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history.history['loss'], label='train')
axes[0].plot(history.history['val_loss'], label='val')
axes[0].set_title('Loss')
axes[0].legend()
axes[1].plot(history.history['accuracy'], label='train')
axes[1].plot(history.history['val_accuracy'], label='val')
axes[1].set_title('Accuracy')
axes[1].legend()
plt.show()
plot_history(history)
微调
微调是指在特征提取之后,解冻部分或全部预训练层,以较低的学习率进行训练。
微调策略
# 加载预训练模型
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 首先进行特征提取(冻结所有层)
base_model.trainable = False
# 构建完整模型
model = models.Sequential([
layers.Input(shape=(224, 224, 3)),
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
# 编译并训练(特征提取阶段)
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
print("阶段1: 特征提取训练")
history1 = model.fit(train_ds, validation_data=val_ds, epochs=5)
# 微调阶段:解冻部分层
base_model.trainable = True
# 冻结前 N 层,只微调后面的层
fine_tune_at = len(base_model.layers) - 20
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
# 使用更小的学习率
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
print(f"\n阶段2: 微调后 {len(base_model.layers) - fine_tune_at} 层")
history2 = model.fit(
train_ds,
validation_data=val_ds,
epochs=10,
initial_epoch=history1.epoch[-1] + 1
)
选择微调层数
微调多少层取决于数据集大小和相似度:
def get_fine_tune_strategy(dataset_size, similarity_to_imagenet):
"""
根据数据集特点选择微调策略
参数:
dataset_size: 'small' (< 1000), 'medium' (1000-10000), 'large' (> 10000)
similarity_to_imagenet: 'high', 'medium', 'low'
"""
strategies = {
('small', 'high'): '只训练分类层,冻结全部预训练层',
('small', 'medium'): '微调最后几层(约10层)',
('small', 'low'): '微调较多层(约20-30层)',
('medium', 'high'): '微调最后几层',
('medium', 'medium'): '微调约20层',
('medium', 'low'): '微调约50层',
('large', 'high'): '微调约20层',
('large', 'medium'): '微调约50层',
('large', 'low'): '微调全部层或从头训练',
}
return strategies.get((dataset_size, similarity_to_imagenet), '默认策略')
# 示例
print(get_fine_tune_strategy('small', 'high'))
print(get_fine_tune_strategy('large', 'low'))
微调最佳实践
# 1. 先进行特征提取训练,再进行微调
# 2. 微调时使用更小的学习率(通常为特征提取阶段的1/10到1/100)
# 3. 逐步解冻:从顶层开始,逐步解冻更多层
# 4. 监控验证集性能,防止过拟合
def create_transfer_model(num_classes, base_model_name='efficientnetb0'):
"""创建迁移学习模型"""
# 获取模型类
model_dict = {
'efficientnetb0': applications.EfficientNetB0,
'resnet50': applications.ResNet50V2,
'mobilenet': applications.MobileNetV3Small,
}
base_model_class = model_dict[base_model_name]
# 加载预训练模型
base_model = base_model_class(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
# 冻结
base_model.trainable = False
# 构建模型
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = models.Model(inputs, outputs)
return model, base_model
def fine_tune_model(model, base_model, num_layers_to_unfreeze=20):
"""微调模型"""
# 解冻基础模型
base_model.trainable = True
# 冻结前面的层
fine_tune_at = len(base_model.layers) - num_layers_to_unfreeze
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
# 重新编译,使用更小的学习率
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
数据增强
迁移学习通常与小数据集配合使用,数据增强可以有效提高泛化能力。
from tensorflow.keras import layers
# 定义数据增强
data_augmentation = tf.keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.2),
layers.RandomZoom(0.2),
layers.RandomContrast(0.2),
layers.RandomTranslation(0.1, 0.1),
])
# 在模型中使用
inputs = layers.Input(shape=(224, 224, 3))
x = data_augmentation(inputs)
x = applications.efficientnet.preprocess_input(x)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = models.Model(inputs, outputs)
不同领域的迁移学习
图像分类
# 使用 EfficientNet 进行图像分类
def create_image_classifier(num_classes, input_size=(224, 224)):
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(*input_size, 3)
)
base_model.trainable = False
model = models.Sequential([
layers.Input(shape=(*input_size, 3)),
data_augmentation,
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
return model
目标检测
# 使用预训练模型进行特征提取,配合检测头
def create_object_detector(num_classes):
# 使用 ResNet 作为骨干网络
base_model = applications.ResNet50V2(
weights='imagenet',
include_top=False,
input_shape=(None, None, 3)
)
# 创建特征金字塔网络(简化版)
# 实际应用中建议使用 TensorFlow Object Detection API
inputs = layers.Input(shape=(None, None, 3))
features = base_model(inputs)
# 检测头
# 分类分支
cls_output = layers.Conv2D(num_classes, 1, activation='sigmoid')(features)
# 回归分支
box_output = layers.Conv2D(4, 1)(features)
model = models.Model(inputs, [cls_output, box_output])
return model
文本分类
import tensorflow_hub as hub
# 使用预训练的文本嵌入
def create_text_classifier(num_classes):
# 加载预训练的文本嵌入模型
embed = hub.KerasLayer(
"https://tfhub.dev/google/nnlm-zh-dim50/2",
input_shape=[],
dtype=tf.string,
trainable=True
)
model = models.Sequential([
embed,
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(num_classes, activation='softmax')
])
return model
# 或使用 BERT
def create_bert_classifier(num_classes):
import tensorflow_text as text
# 预处理层
preprocessor = hub.KerasLayer(
"https://tfhub.dev/tensorflow/bert_zh_preprocess/3"
)
# BERT 编码器
encoder = hub.KerasLayer(
"https://tfhub.dev/tensorflow/bert_zh_L-12_H-768_A-12/4",
trainable=True
)
# 构建模型
inputs = layers.Input(shape=(), dtype=tf.string)
x = preprocessor(inputs)
x = encoder(x)
x = x['pooled_output']
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
model = models.Model(inputs, outputs)
return model
自定义预训练模型
保存和加载自定义模型
# 训练一个模型作为预训练模型
pretrain_model = models.Sequential([
layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, activation='relu'),
layers.GlobalAveragePooling2D(),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax')
])
# 训练并保存
pretrain_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
# pretrain_model.fit(...)
pretrain_model.save('pretrained_model.keras')
# 加载并用于迁移学习
base_model = tf.keras.models.load_model('pretrained_model.keras')
# 移除最后的分类层
feature_extractor = models.Model(
inputs=base_model.input,
outputs=base_model.layers[-2].output # 倒数第二层
)
# 冻结并添加新的分类层
feature_extractor.trainable = False
new_model = models.Sequential([
feature_extractor,
layers.Dense(5, activation='softmax') # 新任务的类别数
])
跨域迁移学习
# 从彩色图像迁移到灰度图像
def adapt_rgb_to_grayscale(base_model):
"""将 RGB 预训练模型适配到灰度图像"""
# 获取第一层权重
first_layer = base_model.layers[0]
weights = first_layer.get_weights()
# RGB 权重形状: (h, w, 3, filters)
# 灰度权重形状: (h, w, 1, filters)
# 对 RGB 通道求平均
new_weights = np.mean(weights[0], axis=2, keepdims=True)
# 创建新的输入层
new_input = layers.Input(shape=(224, 224, 1))
x = layers.Conv2D(
filters=first_layer.filters,
kernel_size=first_layer.kernel_size,
strides=first_layer.strides,
padding=first_layer.padding,
weights=[new_weights, weights[1]]
)(new_input)
# 连接剩余层
for layer in base_model.layers[1:]:
x = layer(x)
return models.Model(new_input, x)
性能优化技巧
混合精度训练
# 启用混合精度
tf.keras.mixed_precision.set_global_policy('mixed_float16')
# 构建模型
base_model = applications.EfficientNetB0(weights='imagenet', include_top=False)
base_model.trainable = False
model = models.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation='relu'),
layers.Dense(num_classes, activation='softmax', dtype='float32') # 输出层使用 float32
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
使用回调函数
callbacks = [
# 早停
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5,
restore_best_weights=True
),
# 学习率衰减
tf.keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.2,
patience=3,
min_lr=1e-7
),
# 保存最佳模型
tf.keras.callbacks.ModelCheckpoint(
'best_model.keras',
monitor='val_accuracy',
save_best_only=True
),
]
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=50,
callbacks=callbacks
)
常见问题
过拟合
# 解决方案
# 1. 增加数据增强
# 2. 增加 Dropout
# 3. 使用更小的模型
# 4. 减少微调层数
# 5. 使用正则化
model = models.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5), # 增加 Dropout
layers.Dense(256, activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.01)), # L2 正则化
layers.Dropout(0.5),
layers.Dense(num_classes, activation='softmax')
])
模型性能不如预期
# 检查预处理是否正确
# 不同模型需要不同的预处理
# EfficientNet: [0, 255] 范围即可
# ResNet: 需要减去均值
# MobileNet: 需要归一化到 [-1, 1]
# 查看预处理函数
preprocess_funcs = {
'efficientnet': applications.efficientnet.preprocess_input,
'resnet': applications.resnet_v2.preprocess_input,
'mobilenet': applications.mobilenet_v3.preprocess_input,
}
小结
本章介绍了迁移学习的核心内容:
- 迁移学习原理:利用预训练模型的知识加速新任务学习
- 特征提取:冻结预训练层,只训练分类层
- 微调:解冻部分层,用小学习率继续训练
- 模型选择:根据任务需求选择合适的预训练模型
- 最佳实践:数据增强、混合精度、回调函数
迁移学习是深度学习实践中的重要技能,能够显著减少训练时间和数据需求。下一章我们将学习模型部署,将训练好的模型应用到实际场景中。