跳到主要内容

迁移学习

迁移学习是深度学习中最实用的技术之一,它允许我们利用在大规模数据集上预训练的模型,快速构建高性能的应用。本章介绍如何在 TensorFlow 中使用迁移学习。

迁移学习概述

什么是迁移学习

迁移学习是指将一个任务上学到的知识迁移到另一个相关任务上。在深度学习中,这通常意味着:

  1. 使用在大规模数据集(如 ImageNet)上预训练的模型
  2. 根据新任务的需求,调整模型的部分或全部参数

迁移学习的核心思想是:神经网络的前几层学习的是通用特征(如边缘、纹理),这些特征对许多任务都有用;后几层学习的是任务特定的特征。

迁移学习的优势

  • 减少训练数据需求:不需要从头训练,小数据集也能获得好效果
  • 加快训练速度:预训练权重提供了良好的初始化
  • 提高模型性能:预训练模型已经学到了丰富的特征表示
  • 降低计算成本:不需要大规模计算资源

适用场景

迁移学习特别适合以下场景:

  • 训练数据有限(几百到几千张图片)
  • 新任务与预训练任务相关(如都是图像分类)
  • 需要快速原型开发
  • 计算资源受限

使用预训练模型

TensorFlow 提供了多种预训练模型,可以通过 tf.keras.applications 访问。

可用的预训练模型

import tensorflow as tf
from tensorflow.keras import applications

# 图像分类模型
models_dict = {
'ResNet': applications.ResNet50V2,
'EfficientNet': applications.EfficientNetB0,
'MobileNet': applications.MobileNetV3Small,
'VGG': applications.VGG16,
'DenseNet': applications.DenseNet121,
'Inception': applications.InceptionV3,
'Xception': applications.Xception,
}

# 查看模型参数
for name, model_class in models_dict.items():
model = model_class(weights=None)
params = model.count_params()
print(f"{name}: {params:,} 参数")

加载预训练模型

import tensorflow as tf
from tensorflow.keras import applications, layers, models

# 加载预训练的 ResNet50V2
base_model = applications.ResNet50V2(
weights='imagenet', # 使用 ImageNet 预训练权重
include_top=False, # 不包含顶层分类器
input_shape=(224, 224, 3) # 输入形状
)

# 查看模型结构
base_model.summary()

# 查看输入输出形状
print(f"输入形状: {base_model.input_shape}")
print(f"输出形状: {base_model.output_shape}")

模型选择指南

选择预训练模型时需要考虑以下因素:

模型参数量准确率推理速度适用场景
MobileNetV3~2.5M中等移动端、嵌入式
EfficientNetB0~5M中等通用场景
ResNet50V2~25M中等通用场景
VGG16~138M中等特征提取
DenseNet121~8M中等医学图像

特征提取

特征提取是最简单的迁移学习方式,冻结预训练模型的所有层,只训练新添加的分类层。

基本流程

import tensorflow as tf
from tensorflow.keras import applications, layers, models

# 1. 加载预训练模型(不包含顶层)
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)

# 2. 冻结预训练模型的权重
base_model.trainable = False

# 3. 添加自定义分类层
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.2)(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = models.Model(inputs, outputs)

# 4. 编译模型
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

model.summary()

完整示例:猫狗分类

import tensorflow as tf
from tensorflow.keras import applications, layers, models
import matplotlib.pyplot as plt
import numpy as np

# 下载猫狗数据集(示例使用 flowers 数据集代替)
import pathlib
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
data_dir = tf.keras.utils.get_file('flower_photos', origin=dataset_url, untar=True)
data_dir = pathlib.Path(data_dir)

# 创建数据集
batch_size = 32
img_height = 224
img_width = 224

train_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=42,
image_size=(img_height, img_width),
batch_size=batch_size
)

val_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=42,
image_size=(img_height, img_width),
batch_size=batch_size
)

# 获取类别名称
class_names = train_ds.class_names
num_classes = len(class_names)
print(f"类别: {class_names}")

# 数据预处理
def preprocess(image, label):
# EfficientNet 需要的预处理
image = applications.efficientnet.preprocess_input(image)
return image, label

train_ds = train_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
val_ds = val_ds.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

# 性能优化
train_ds = train_ds.cache().shuffle(1000).prefetch(tf.data.AUTOTUNE)
val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)

# 构建模型
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)
base_model.trainable = False

model = models.Sequential([
layers.Input(shape=(224, 224, 3)),
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])

model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

# 训练
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=10
)

# 绘制训练曲线
def plot_history(history):
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(history.history['loss'], label='train')
axes[0].plot(history.history['val_loss'], label='val')
axes[0].set_title('Loss')
axes[0].legend()

axes[1].plot(history.history['accuracy'], label='train')
axes[1].plot(history.history['val_accuracy'], label='val')
axes[1].set_title('Accuracy')
axes[1].legend()

plt.show()

plot_history(history)

微调

微调是指在特征提取之后,解冻部分或全部预训练层,以较低的学习率进行训练。

微调策略

# 加载预训练模型
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)

# 首先进行特征提取(冻结所有层)
base_model.trainable = False

# 构建完整模型
model = models.Sequential([
layers.Input(shape=(224, 224, 3)),
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])

# 编译并训练(特征提取阶段)
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

print("阶段1: 特征提取训练")
history1 = model.fit(train_ds, validation_data=val_ds, epochs=5)

# 微调阶段:解冻部分层
base_model.trainable = True

# 冻结前 N 层,只微调后面的层
fine_tune_at = len(base_model.layers) - 20
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False

# 使用更小的学习率
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

print(f"\n阶段2: 微调后 {len(base_model.layers) - fine_tune_at} 层")
history2 = model.fit(
train_ds,
validation_data=val_ds,
epochs=10,
initial_epoch=history1.epoch[-1] + 1
)

选择微调层数

微调多少层取决于数据集大小和相似度:

def get_fine_tune_strategy(dataset_size, similarity_to_imagenet):
"""
根据数据集特点选择微调策略

参数:
dataset_size: 'small' (< 1000), 'medium' (1000-10000), 'large' (> 10000)
similarity_to_imagenet: 'high', 'medium', 'low'
"""
strategies = {
('small', 'high'): '只训练分类层,冻结全部预训练层',
('small', 'medium'): '微调最后几层(约10层)',
('small', 'low'): '微调较多层(约20-30层)',
('medium', 'high'): '微调最后几层',
('medium', 'medium'): '微调约20层',
('medium', 'low'): '微调约50层',
('large', 'high'): '微调约20层',
('large', 'medium'): '微调约50层',
('large', 'low'): '微调全部层或从头训练',
}
return strategies.get((dataset_size, similarity_to_imagenet), '默认策略')

# 示例
print(get_fine_tune_strategy('small', 'high'))
print(get_fine_tune_strategy('large', 'low'))

微调最佳实践

# 1. 先进行特征提取训练,再进行微调
# 2. 微调时使用更小的学习率(通常为特征提取阶段的1/10到1/100)
# 3. 逐步解冻:从顶层开始,逐步解冻更多层
# 4. 监控验证集性能,防止过拟合

def create_transfer_model(num_classes, base_model_name='efficientnetb0'):
"""创建迁移学习模型"""

# 获取模型类
model_dict = {
'efficientnetb0': applications.EfficientNetB0,
'resnet50': applications.ResNet50V2,
'mobilenet': applications.MobileNetV3Small,
}

base_model_class = model_dict[base_model_name]

# 加载预训练模型
base_model = base_model_class(
weights='imagenet',
include_top=False,
input_shape=(224, 224, 3)
)

# 冻结
base_model.trainable = False

# 构建模型
inputs = layers.Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.BatchNormalization()(x)
x = layers.Dropout(0.3)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)

model = models.Model(inputs, outputs)

return model, base_model

def fine_tune_model(model, base_model, num_layers_to_unfreeze=20):
"""微调模型"""

# 解冻基础模型
base_model.trainable = True

# 冻结前面的层
fine_tune_at = len(base_model.layers) - num_layers_to_unfreeze
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False

# 重新编译,使用更小的学习率
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

return model

数据增强

迁移学习通常与小数据集配合使用,数据增强可以有效提高泛化能力。

from tensorflow.keras import layers

# 定义数据增强
data_augmentation = tf.keras.Sequential([
layers.RandomFlip('horizontal'),
layers.RandomRotation(0.2),
layers.RandomZoom(0.2),
layers.RandomContrast(0.2),
layers.RandomTranslation(0.1, 0.1),
])

# 在模型中使用
inputs = layers.Input(shape=(224, 224, 3))
x = data_augmentation(inputs)
x = applications.efficientnet.preprocess_input(x)
x = base_model(x, training=False)
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)

model = models.Model(inputs, outputs)

不同领域的迁移学习

图像分类

# 使用 EfficientNet 进行图像分类
def create_image_classifier(num_classes, input_size=(224, 224)):
base_model = applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=(*input_size, 3)
)
base_model.trainable = False

model = models.Sequential([
layers.Input(shape=(*input_size, 3)),
data_augmentation,
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])

return model

目标检测

# 使用预训练模型进行特征提取,配合检测头
def create_object_detector(num_classes):
# 使用 ResNet 作为骨干网络
base_model = applications.ResNet50V2(
weights='imagenet',
include_top=False,
input_shape=(None, None, 3)
)

# 创建特征金字塔网络(简化版)
# 实际应用中建议使用 TensorFlow Object Detection API

inputs = layers.Input(shape=(None, None, 3))
features = base_model(inputs)

# 检测头
# 分类分支
cls_output = layers.Conv2D(num_classes, 1, activation='sigmoid')(features)
# 回归分支
box_output = layers.Conv2D(4, 1)(features)

model = models.Model(inputs, [cls_output, box_output])
return model

文本分类

import tensorflow_hub as hub

# 使用预训练的文本嵌入
def create_text_classifier(num_classes):
# 加载预训练的文本嵌入模型
embed = hub.KerasLayer(
"https://tfhub.dev/google/nnlm-zh-dim50/2",
input_shape=[],
dtype=tf.string,
trainable=True
)

model = models.Sequential([
embed,
layers.Dense(64, activation='relu'),
layers.Dropout(0.3),
layers.Dense(num_classes, activation='softmax')
])

return model

# 或使用 BERT
def create_bert_classifier(num_classes):
import tensorflow_text as text

# 预处理层
preprocessor = hub.KerasLayer(
"https://tfhub.dev/tensorflow/bert_zh_preprocess/3"
)

# BERT 编码器
encoder = hub.KerasLayer(
"https://tfhub.dev/tensorflow/bert_zh_L-12_H-768_A-12/4",
trainable=True
)

# 构建模型
inputs = layers.Input(shape=(), dtype=tf.string)
x = preprocessor(inputs)
x = encoder(x)
x = x['pooled_output']
x = layers.Dropout(0.1)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)

model = models.Model(inputs, outputs)
return model

自定义预训练模型

保存和加载自定义模型

# 训练一个模型作为预训练模型
pretrain_model = models.Sequential([
layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, activation='relu'),
layers.MaxPooling2D(),
layers.Conv2D(64, 3, activation='relu'),
layers.GlobalAveragePooling2D(),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax')
])

# 训练并保存
pretrain_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
# pretrain_model.fit(...)
pretrain_model.save('pretrained_model.keras')

# 加载并用于迁移学习
base_model = tf.keras.models.load_model('pretrained_model.keras')

# 移除最后的分类层
feature_extractor = models.Model(
inputs=base_model.input,
outputs=base_model.layers[-2].output # 倒数第二层
)

# 冻结并添加新的分类层
feature_extractor.trainable = False

new_model = models.Sequential([
feature_extractor,
layers.Dense(5, activation='softmax') # 新任务的类别数
])

跨域迁移学习

# 从彩色图像迁移到灰度图像
def adapt_rgb_to_grayscale(base_model):
"""将 RGB 预训练模型适配到灰度图像"""

# 获取第一层权重
first_layer = base_model.layers[0]
weights = first_layer.get_weights()

# RGB 权重形状: (h, w, 3, filters)
# 灰度权重形状: (h, w, 1, filters)
# 对 RGB 通道求平均
new_weights = np.mean(weights[0], axis=2, keepdims=True)

# 创建新的输入层
new_input = layers.Input(shape=(224, 224, 1))
x = layers.Conv2D(
filters=first_layer.filters,
kernel_size=first_layer.kernel_size,
strides=first_layer.strides,
padding=first_layer.padding,
weights=[new_weights, weights[1]]
)(new_input)

# 连接剩余层
for layer in base_model.layers[1:]:
x = layer(x)

return models.Model(new_input, x)

性能优化技巧

混合精度训练

# 启用混合精度
tf.keras.mixed_precision.set_global_policy('mixed_float16')

# 构建模型
base_model = applications.EfficientNetB0(weights='imagenet', include_top=False)
base_model.trainable = False

model = models.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dense(256, activation='relu'),
layers.Dense(num_classes, activation='softmax', dtype='float32') # 输出层使用 float32
])

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

使用回调函数

callbacks = [
# 早停
tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5,
restore_best_weights=True
),
# 学习率衰减
tf.keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.2,
patience=3,
min_lr=1e-7
),
# 保存最佳模型
tf.keras.callbacks.ModelCheckpoint(
'best_model.keras',
monitor='val_accuracy',
save_best_only=True
),
]

history = model.fit(
train_ds,
validation_data=val_ds,
epochs=50,
callbacks=callbacks
)

常见问题

过拟合

# 解决方案
# 1. 增加数据增强
# 2. 增加 Dropout
# 3. 使用更小的模型
# 4. 减少微调层数
# 5. 使用正则化

model = models.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.Dropout(0.5), # 增加 Dropout
layers.Dense(256, activation='relu',
kernel_regularizer=tf.keras.regularizers.l2(0.01)), # L2 正则化
layers.Dropout(0.5),
layers.Dense(num_classes, activation='softmax')
])

模型性能不如预期

# 检查预处理是否正确
# 不同模型需要不同的预处理

# EfficientNet: [0, 255] 范围即可
# ResNet: 需要减去均值
# MobileNet: 需要归一化到 [-1, 1]

# 查看预处理函数
preprocess_funcs = {
'efficientnet': applications.efficientnet.preprocess_input,
'resnet': applications.resnet_v2.preprocess_input,
'mobilenet': applications.mobilenet_v3.preprocess_input,
}

小结

本章介绍了迁移学习的核心内容:

  1. 迁移学习原理:利用预训练模型的知识加速新任务学习
  2. 特征提取:冻结预训练层,只训练分类层
  3. 微调:解冻部分层,用小学习率继续训练
  4. 模型选择:根据任务需求选择合适的预训练模型
  5. 最佳实践:数据增强、混合精度、回调函数

迁移学习是深度学习实践中的重要技能,能够显著减少训练时间和数据需求。下一章我们将学习模型部署,将训练好的模型应用到实际场景中。