跳到主要内容

迁移学习

迁移学习是深度学习中最实用的技术之一,它利用在大规模数据集上预训练的模型,通过微调来适应新任务。本章将详细介绍迁移学习的原理和实践方法。

迁移学习基础

什么是迁移学习?

迁移学习是一种机器学习技术,它将从一个任务(源域)学到的知识应用到另一个相关任务(目标域)。在计算机视觉领域,通常将在 ImageNet 等大规模数据集上预训练的模型作为起点,然后针对特定任务进行微调。

为什么使用迁移学习?

迁移学习有以下几个主要优势:

  1. 减少训练数据需求:预训练模型已经学习了丰富的特征,只需要少量数据就能获得良好效果
  2. 加速训练过程:从预训练权重开始,收敛速度更快
  3. 提高模型性能:预训练模型学到的通用特征往往比从头训练更好
  4. 降低计算成本:不需要从头训练大型模型

迁移学习的适用场景

场景数据量数据相似度推荐策略
场景1大量微调全部层
场景2大量从头训练或微调全部层
场景3少量冻结特征层,只训练分类器
场景4少量困难,考虑使用中间层特征

使用预训练模型

torchvision 预训练模型

PyTorch 的 torchvision 库提供了大量预训练模型:

import torch
import torchvision.models as models

resnet18 = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
resnet50 = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
vgg16 = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
densenet121 = models.densenet121(weights=models.DenseNet121_Weights.DEFAULT)
mobilenet_v3 = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)

efficientnet_b0 = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
vit_b_16 = models.vit_b_16(weights=models.ViT_B_16_Weights.DEFAULT)
swin_t = models.swin_t(weights=models.Swin_T_Weights.DEFAULT)

查看模型结构

import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

print(model)

print("\n模型最后几层:")
print(model.fc)

print("\n模型参数统计:")
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")

常用预训练模型对比

模型参数量Top-1 准确率特点
ResNet-1811.7M69.8%轻量级,适合入门
ResNet-5025.6M76.1%平衡性能和速度
ResNet-15260.2M78.3%高精度
VGG-16138M71.6%结构简单,参数多
DenseNet-1218.0M74.4%参数效率高
MobileNet-V32.5M67.7%移动端优化
EfficientNet-B05.3M77.1%高效架构
ViT-B/1686M81.1%Transformer 架构

微调策略

策略一:特征提取

冻结预训练的所有层,只训练最后的分类层:

import torch
import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

for param in model.parameters():
param.requires_grad = False

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)

print("可训练参数:")
for name, param in model.named_parameters():
if param.requires_grad:
print(f" {name}: {param.shape}")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)

def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0

for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)

optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

total_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()

return total_loss / len(dataloader), correct / total

策略二:微调全部层

解冻所有层,使用较小的学习率训练:

import torch
import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)

for param in model.parameters():
param.requires_grad = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

criterion = nn.CrossEntropyLoss()

base_params = [p for n, p in model.named_parameters() if 'fc' not in n]
fc_params = [p for n, p in model.named_parameters() if 'fc' in n]

optimizer = torch.optim.Adam([
{'params': base_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])

策略三:渐进式解冻

先训练分类层,然后逐步解冻更多层:

import torch
import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)

def freeze_layers(model, freeze_until='layer4'):
"""
冻结指定层之前的所有层

Args:
model: 模型
freeze_until: 冻结到这一层(不包含该层)
"""
freeze = True
for name, param in model.named_parameters():
if freeze_until in name:
freeze = False
param.requires_grad = not freeze

print("阶段1:只训练 fc 层")
freeze_layers(model, 'layer4')
for name, param in model.named_parameters():
if param.requires_grad:
print(f" 可训练: {name}")

print("\n阶段2:训练 layer4 和 fc")
freeze_layers(model, 'layer3')
for name, param in model.named_parameters():
if param.requires_grad:
print(f" 可训练: {name}")

print("\n阶段3:训练所有层")
for param in model.parameters():
param.requires_grad = True

策略四:差异化学习率

对不同层使用不同的学习率:

import torch
import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)

layer1_params = []
layer2_params = []
layer3_params = []
layer4_params = []
fc_params = []

for name, param in model.named_parameters():
if 'layer1' in name:
layer1_params.append(param)
elif 'layer2' in name:
layer2_params.append(param)
elif 'layer3' in name:
layer3_params.append(param)
elif 'layer4' in name:
layer4_params.append(param)
elif 'fc' in name:
fc_params.append(param)

optimizer = torch.optim.Adam([
{'params': layer1_params, 'lr': 1e-5},
{'params': layer2_params, 'lr': 2e-5},
{'params': layer3_params, 'lr': 5e-5},
{'params': layer4_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])

print("差异化学习率设置:")
print(" layer1: 1e-5")
print(" layer2: 2e-5")
print(" layer3: 5e-5")
print(" layer4: 1e-4")
print(" fc: 1e-3")

自定义分类器

修改最后一层

最简单的迁移学习方式是替换最后的分类层:

import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

num_classes = 10
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)

print(f"新的分类层: {model.fc}")

添加自定义分类头

有时需要更复杂的分类头:

import torch.nn as nn
import torchvision.models as models

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features

model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(num_features, 512),
nn.ReLU(),
nn.BatchNorm1d(512),
nn.Dropout(0.3),
nn.Linear(512, 10)
)

print("自定义分类头:")
print(model.fc)

修改多个层

对于 VGG 等模型,可能需要修改多个层:

import torch.nn as nn
import torchvision.models as models

model = models.vgg16(weights=models.VGG16_Weights.DEFAULT)

print("原始分类器:")
print(model.classifier)

model.classifier[6] = nn.Linear(4096, 10)

print("\n修改后分类器:")
print(model.classifier)

实战案例:图像分类

完整迁移学习流程

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import os

data_dir = './data'
batch_size = 32
num_epochs = 10
num_classes = 10

train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_dataset = datasets.CIFAR10(root=data_dir, train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR10(root=data_dir, train=False, download=True, transform=val_transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)

for param in model.parameters():
param.requires_grad = False

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0

for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)

optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()

return running_loss / len(dataloader), correct / total

def validate(model, dataloader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)

running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()

return running_loss / len(dataloader), correct / total

best_acc = 0.0
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()

print(f"Epoch {epoch+1}/{num_epochs}")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
print(f" Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')

print(f"\n最佳验证准确率: {best_acc:.4f}")

两阶段训练

先冻结训练分类器,再解冻微调整个网络:

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

def train_transfer_learning(model, train_loader, val_loader, num_classes, num_epochs_stage1=5, num_epochs_stage2=10):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)
model = model.to(device)

criterion = nn.CrossEntropyLoss()

print("=" * 50)
print("阶段1:冻结特征提取层,训练分类器")
print("=" * 50)

for param in model.parameters():
param.requires_grad = False
model.fc.weight.requires_grad = True
model.fc.bias.requires_grad = True

optimizer = optim.Adam(model.fc.parameters(), lr=0.001)

for epoch in range(num_epochs_stage1):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
print(f"Stage1 Epoch {epoch+1}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}")

print("\n" + "=" * 50)
print("阶段2:解冻所有层,微调整个网络")
print("=" * 50)

for param in model.parameters():
param.requires_grad = True

base_params = [p for n, p in model.named_parameters() if 'fc' not in n]
fc_params = [p for n, p in model.named_parameters() if 'fc' in n]

optimizer = optim.Adam([
{'params': base_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])

scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs_stage2)

best_acc = 0.0
for epoch in range(num_epochs_stage2):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()

if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_finetuned.pth')

print(f"Stage2 Epoch {epoch+1}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}")

return model, best_acc

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
model, best_acc = train_transfer_learning(model, train_loader, val_loader, num_classes=10)

高级迁移学习技术

知识蒸馏

使用大模型(教师)指导小模型(学生)学习:

import torch
import torch.nn as nn
import torch.nn.functional as F

class DistillationLoss(nn.Module):
def __init__(self, temperature=3.0, alpha=0.7):
super().__init__()
self.temperature = temperature
self.alpha = alpha
self.ce_loss = nn.CrossEntropyLoss()
self.kl_loss = nn.KLDivLoss(reduction='batchmean')

def forward(self, student_outputs, teacher_outputs, labels):
soft_loss = self.kl_loss(
F.log_softmax(student_outputs / self.temperature, dim=1),
F.softmax(teacher_outputs / self.temperature, dim=1)
) * (self.temperature ** 2)

hard_loss = self.ce_loss(student_outputs, labels)

return self.alpha * soft_loss + (1 - self.alpha) * hard_loss

teacher_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
teacher_model.fc = nn.Linear(teacher_model.fc.in_features, 10)
teacher_model = teacher_model.to(device)
teacher_model.eval()

student_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
student_model.fc = nn.Linear(student_model.fc.in_features, 10)
student_model = student_model.to(device)

criterion = DistillationLoss(temperature=3.0, alpha=0.7)
optimizer = optim.Adam(student_model.parameters(), lr=0.001)

for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)

with torch.no_grad():
teacher_outputs = teacher_model(images)

student_outputs = student_model(images)
loss = criterion(student_outputs, teacher_outputs, labels)

optimizer.zero_grad()
loss.backward()
optimizer.step()

多任务迁移学习

从一个模型迁移到多个相关任务:

import torch
import torch.nn as nn
from torchvision import models

class MultiTaskModel(nn.Module):
def __init__(self, backbone, num_classes_task1, num_classes_task2):
super().__init__()

self.backbone = backbone

num_features = backbone.fc.in_features
backbone.fc = nn.Identity()

self.classifier1 = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes_task1)
)

self.classifier2 = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes_task2)
)

def forward(self, x):
features = self.backbone(x)
out1 = self.classifier1(features)
out2 = self.classifier2(features)
return out1, out2

backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
multi_task_model = MultiTaskModel(backbone, num_classes_task1=10, num_classes_task2=5)

print("多任务模型结构:")
print(multi_task_model)

域适应迁移学习

处理源域和目标域分布不同的情况:

import torch
import torch.nn as nn
from torchvision import models

class DomainAdaptationModel(nn.Module):
def __init__(self, num_classes):
super().__init__()

resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
self.features = nn.Sequential(*list(resnet.children())[:-1])

num_features = resnet.fc.in_features

self.classifier = nn.Linear(num_features, num_classes)

self.domain_classifier = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
)

def forward(self, x, alpha=1.0):
features = self.features(x)
features = features.view(features.size(0), -1)

class_output = self.classifier(features)

reverse_features = GradientReversalFunction.apply(features, alpha)
domain_output = self.domain_classifier(reverse_features)

return class_output, domain_output

class GradientReversalFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, x, alpha):
ctx.alpha = alpha
return x.clone()

@staticmethod
def backward(ctx, grad_output):
return grad_output.neg() * ctx.alpha, None

model = DomainAdaptationModel(num_classes=10)
print("域适应模型创建成功")

迁移学习最佳实践

数据预处理一致性

使用与预训练时相同的数据预处理:

from torchvision import transforms

imagenet_normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)

train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
imagenet_normalize
])

val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
imagenet_normalize
])

学习率选择

import torch.optim as optim

model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
model.fc = nn.Linear(model.fc.in_features, 10)

pretrained_params = [p for n, p in model.named_parameters() if 'fc' not in n]
new_params = [p for n, p in model.named_parameters() if 'fc' in n]

optimizer = optim.Adam([
{'params': pretrained_params, 'lr': 1e-4},
{'params': new_params, 'lr': 1e-3}
])

print("学习率设置:")
print(" 预训练层: 1e-4 (较小)")
print(" 新层: 1e-3 (较大)")

早停与模型保存

import torch
import numpy as np

class EarlyStopping:
def __init__(self, patience=7, min_delta=0.0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False

def __call__(self, val_loss, model, path):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(model, path)
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.save_checkpoint(model, path)
self.counter = 0

def save_checkpoint(self, model, path):
torch.save(model.state_dict(), path)

early_stopping = EarlyStopping(patience=5, min_delta=0.001)

for epoch in range(100):
val_loss = validate(model, val_loader, criterion, device)[0]
early_stopping(val_loss, model, 'best_transfer_model.pth')

if early_stopping.early_stop:
print(f"早停于 epoch {epoch}")
break

常见问题

问题 1:过拟合

症状:训练准确率高,验证准确率低

解决方案

  • 增加数据增强
  • 使用更强的正则化(Dropout、权重衰减)
  • 减少可训练参数
  • 使用早停

问题 2:训练不稳定

症状:损失震荡或发散

解决方案

  • 降低学习率
  • 使用学习率预热
  • 使用梯度裁剪
  • 检查数据预处理

问题 3:性能不如预期

症状:迁移学习效果不如从头训练

解决方案

  • 检查数据预处理是否与预训练一致
  • 尝试不同的预训练模型
  • 调整学习率和微调策略
  • 检查目标域与源域的差异

小结

本章我们学习了:

  1. 迁移学习原理:利用预训练模型的知识
  2. 微调策略:特征提取、全层微调、渐进式解冻
  3. 自定义分类器:修改模型结构适应新任务
  4. 高级技术:知识蒸馏、多任务学习、域适应
  5. 最佳实践:数据预处理、学习率选择、早停策略

参考资源