迁移学习
迁移学习是深度学习中最实用的技术之一,它利用在大规模数据集上预训练的模型,通过微调来适应新任务。本章将详细介绍迁移学习的原理和实践方法。
迁移学习基础
什么是迁移学习?
迁移学习是一种机器学习技术,它将从一个任务(源域)学到的知识应用到另一个相关任务(目标域)。在计算机视觉领域,通常将在 ImageNet 等大规模数据集上预训练的模型作为起点,然后针对特定任务进行微调。
为什么使用迁移学习?
迁移学习有以下几个主要优势:
- 减少训练数据需求:预训练模型已经学习了丰富的特征,只需要少量数据就能获得良好效果
- 加速训练过程:从预训练权重开始,收敛速度更快
- 提高模型性能:预训练模型学到的通用特征往往比从头训练更好
- 降低计算成本:不需要从头训练大型模型
迁移学习的适用场景
| 场景 | 数据量 | 数据相似度 | 推荐策略 |
|---|---|---|---|
| 场景1 | 大量 | 高 | 微调全部层 |
| 场景2 | 大量 | 低 | 从头训练或微调全部层 |
| 场景3 | 少量 | 高 | 冻结特征层,只训练分类器 |
| 场景4 | 少量 | 低 | 困难,考虑使用中间层特征 |
使用预训练模型
torchvision 预训练模型
PyTorch 的 torchvision 库提供了大量预训练模型:
import torch
import torchvision.models as models
resnet18 = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
resnet50 = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
vgg16 = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
densenet121 = models.densenet121(weights=models.DenseNet121_Weights.DEFAULT)
mobilenet_v3 = models.mobilenet_v3_small(weights=models.MobileNet_V3_Small_Weights.DEFAULT)
efficientnet_b0 = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.DEFAULT)
vit_b_16 = models.vit_b_16(weights=models.ViT_B_16_Weights.DEFAULT)
swin_t = models.swin_t(weights=models.Swin_T_Weights.DEFAULT)
查看模型结构
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
print(model)
print("\n模型最后几层:")
print(model.fc)
print("\n模型参数统计:")
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
常用预训练模型对比
| 模型 | 参数量 | Top-1 准确率 | 特点 |
|---|---|---|---|
| ResNet-18 | 11.7M | 69.8% | 轻量级,适合入门 |
| ResNet-50 | 25.6M | 76.1% | 平衡性能和速度 |
| ResNet-152 | 60.2M | 78.3% | 高精度 |
| VGG-16 | 138M | 71.6% | 结构简单,参数多 |
| DenseNet-121 | 8.0M | 74.4% | 参数效率高 |
| MobileNet-V3 | 2.5M | 67.7% | 移动端优化 |
| EfficientNet-B0 | 5.3M | 77.1% | 高效架构 |
| ViT-B/16 | 86M | 81.1% | Transformer 架构 |
微调策略
策略一:特征提取
冻结预训练的所有层,只训练最后的分类层:
import torch
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
for param in model.parameters():
param.requires_grad = False
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)
print("可训练参数:")
for name, param in model.named_parameters():
if param.requires_grad:
print(f" {name}: {param.shape}")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.fc.parameters(), lr=0.001)
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
return total_loss / len(dataloader), correct / total
策略二:微调全部层
解冻所有层,使用较小的学习率训练:
import torch
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)
for param in model.parameters():
param.requires_grad = True
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
base_params = [p for n, p in model.named_parameters() if 'fc' not in n]
fc_params = [p for n, p in model.named_parameters() if 'fc' in n]
optimizer = torch.optim.Adam([
{'params': base_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])
策略三:渐进式解冻
先训练分类层,然后逐步解冻更多层:
import torch
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)
def freeze_layers(model, freeze_until='layer4'):
"""
冻结指定层之前的所有层
Args:
model: 模型
freeze_until: 冻结到这一层(不包含该层)
"""
freeze = True
for name, param in model.named_parameters():
if freeze_until in name:
freeze = False
param.requires_grad = not freeze
print("阶段1:只训练 fc 层")
freeze_layers(model, 'layer4')
for name, param in model.named_parameters():
if param.requires_grad:
print(f" 可训练: {name}")
print("\n阶段2:训练 layer4 和 fc")
freeze_layers(model, 'layer3')
for name, param in model.named_parameters():
if param.requires_grad:
print(f" 可训练: {name}")
print("\n阶段3:训练所有层")
for param in model.parameters():
param.requires_grad = True
策略四:差异化学习率
对不同层使用不同的学习率:
import torch
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10)
layer1_params = []
layer2_params = []
layer3_params = []
layer4_params = []
fc_params = []
for name, param in model.named_parameters():
if 'layer1' in name:
layer1_params.append(param)
elif 'layer2' in name:
layer2_params.append(param)
elif 'layer3' in name:
layer3_params.append(param)
elif 'layer4' in name:
layer4_params.append(param)
elif 'fc' in name:
fc_params.append(param)
optimizer = torch.optim.Adam([
{'params': layer1_params, 'lr': 1e-5},
{'params': layer2_params, 'lr': 2e-5},
{'params': layer3_params, 'lr': 5e-5},
{'params': layer4_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])
print("差异化学习率设置:")
print(" layer1: 1e-5")
print(" layer2: 2e-5")
print(" layer3: 5e-5")
print(" layer4: 1e-4")
print(" fc: 1e-3")
自定义分类器
修改最后一层
最简单的迁移学习方式是替换最后的分类层:
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_classes = 10
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)
print(f"新的分类层: {model.fc}")
添加自定义分类头
有时需要更复杂的分类头:
import torch.nn as nn
import torchvision.models as models
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
num_features = model.fc.in_features
model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(num_features, 512),
nn.ReLU(),
nn.BatchNorm1d(512),
nn.Dropout(0.3),
nn.Linear(512, 10)
)
print("自定义分类头:")
print(model.fc)
修改多个层
对于 VGG 等模型,可能需要修改多个层:
import torch.nn as nn
import torchvision.models as models
model = models.vgg16(weights=models.VGG16_Weights.DEFAULT)
print("原始分类器:")
print(model.classifier)
model.classifier[6] = nn.Linear(4096, 10)
print("\n修改后分类器:")
print(model.classifier)
实战案例:图像分类
完整迁移学习流程
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import os
data_dir = './data'
batch_size = 32
num_epochs = 10
num_classes = 10
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
train_dataset = datasets.CIFAR10(root=data_dir, train=True, download=True, transform=train_transform)
val_dataset = datasets.CIFAR10(root=data_dir, train=False, download=True, transform=val_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
for param in model.parameters():
param.requires_grad = False
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
return running_loss / len(dataloader), correct / total
def validate(model, dataloader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
running_loss += loss.item()
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
return running_loss / len(dataloader), correct / total
best_acc = 0.0
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()
print(f"Epoch {epoch+1}/{num_epochs}")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
print(f" Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
print(f"\n最佳验证准确率: {best_acc:.4f}")
两阶段训练
先冻结训练分类器,再解冻微调整个网络:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
def train_transfer_learning(model, train_loader, val_loader, num_classes, num_epochs_stage1=5, num_epochs_stage2=10):
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, num_classes)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
print("=" * 50)
print("阶段1:冻结特征提取层,训练分类器")
print("=" * 50)
for param in model.parameters():
param.requires_grad = False
model.fc.weight.requires_grad = True
model.fc.bias.requires_grad = True
optimizer = optim.Adam(model.fc.parameters(), lr=0.001)
for epoch in range(num_epochs_stage1):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
print(f"Stage1 Epoch {epoch+1}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}")
print("\n" + "=" * 50)
print("阶段2:解冻所有层,微调整个网络")
print("=" * 50)
for param in model.parameters():
param.requires_grad = True
base_params = [p for n, p in model.named_parameters() if 'fc' not in n]
fc_params = [p for n, p in model.named_parameters() if 'fc' in n]
optimizer = optim.Adam([
{'params': base_params, 'lr': 1e-4},
{'params': fc_params, 'lr': 1e-3}
])
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs_stage2)
best_acc = 0.0
for epoch in range(num_epochs_stage2):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_finetuned.pth')
print(f"Stage2 Epoch {epoch+1}: Train Acc={train_acc:.4f}, Val Acc={val_acc:.4f}")
return model, best_acc
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
model, best_acc = train_transfer_learning(model, train_loader, val_loader, num_classes=10)
高级迁移学习技术
知识蒸馏
使用大模型(教师)指导小模型(学生)学习:
import torch
import torch.nn as nn
import torch.nn.functional as F
class DistillationLoss(nn.Module):
def __init__(self, temperature=3.0, alpha=0.7):
super().__init__()
self.temperature = temperature
self.alpha = alpha
self.ce_loss = nn.CrossEntropyLoss()
self.kl_loss = nn.KLDivLoss(reduction='batchmean')
def forward(self, student_outputs, teacher_outputs, labels):
soft_loss = self.kl_loss(
F.log_softmax(student_outputs / self.temperature, dim=1),
F.softmax(teacher_outputs / self.temperature, dim=1)
) * (self.temperature ** 2)
hard_loss = self.ce_loss(student_outputs, labels)
return self.alpha * soft_loss + (1 - self.alpha) * hard_loss
teacher_model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
teacher_model.fc = nn.Linear(teacher_model.fc.in_features, 10)
teacher_model = teacher_model.to(device)
teacher_model.eval()
student_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
student_model.fc = nn.Linear(student_model.fc.in_features, 10)
student_model = student_model.to(device)
criterion = DistillationLoss(temperature=3.0, alpha=0.7)
optimizer = optim.Adam(student_model.parameters(), lr=0.001)
for images, labels in train_loader:
images, labels = images.to(device), labels.to(device)
with torch.no_grad():
teacher_outputs = teacher_model(images)
student_outputs = student_model(images)
loss = criterion(student_outputs, teacher_outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
多任务迁移学习
从一个模型迁移到多个相关任务:
import torch
import torch.nn as nn
from torchvision import models
class MultiTaskModel(nn.Module):
def __init__(self, backbone, num_classes_task1, num_classes_task2):
super().__init__()
self.backbone = backbone
num_features = backbone.fc.in_features
backbone.fc = nn.Identity()
self.classifier1 = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes_task1)
)
self.classifier2 = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes_task2)
)
def forward(self, x):
features = self.backbone(x)
out1 = self.classifier1(features)
out2 = self.classifier2(features)
return out1, out2
backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
multi_task_model = MultiTaskModel(backbone, num_classes_task1=10, num_classes_task2=5)
print("多任务模型结构:")
print(multi_task_model)
域适应迁移学习
处理源域和目标域分布不同的情况:
import torch
import torch.nn as nn
from torchvision import models
class DomainAdaptationModel(nn.Module):
def __init__(self, num_classes):
super().__init__()
resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
self.features = nn.Sequential(*list(resnet.children())[:-1])
num_features = resnet.fc.in_features
self.classifier = nn.Linear(num_features, num_classes)
self.domain_classifier = nn.Sequential(
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, x, alpha=1.0):
features = self.features(x)
features = features.view(features.size(0), -1)
class_output = self.classifier(features)
reverse_features = GradientReversalFunction.apply(features, alpha)
domain_output = self.domain_classifier(reverse_features)
return class_output, domain_output
class GradientReversalFunction(torch.autograd.Function):
@staticmethod
def forward(ctx, x, alpha):
ctx.alpha = alpha
return x.clone()
@staticmethod
def backward(ctx, grad_output):
return grad_output.neg() * ctx.alpha, None
model = DomainAdaptationModel(num_classes=10)
print("域适应模型创建成功")
迁移学习最佳实践
数据预处理一致性
使用与预训练时相同的数据预处理:
from torchvision import transforms
imagenet_normalize = transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
imagenet_normalize
])
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
imagenet_normalize
])
学习率选择
import torch.optim as optim
model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
model.fc = nn.Linear(model.fc.in_features, 10)
pretrained_params = [p for n, p in model.named_parameters() if 'fc' not in n]
new_params = [p for n, p in model.named_parameters() if 'fc' in n]
optimizer = optim.Adam([
{'params': pretrained_params, 'lr': 1e-4},
{'params': new_params, 'lr': 1e-3}
])
print("学习率设置:")
print(" 预训练层: 1e-4 (较小)")
print(" 新层: 1e-3 (较大)")
早停与模型保存
import torch
import numpy as np
class EarlyStopping:
def __init__(self, patience=7, min_delta=0.0):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_loss = None
self.early_stop = False
def __call__(self, val_loss, model, path):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(model, path)
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_loss = val_loss
self.save_checkpoint(model, path)
self.counter = 0
def save_checkpoint(self, model, path):
torch.save(model.state_dict(), path)
early_stopping = EarlyStopping(patience=5, min_delta=0.001)
for epoch in range(100):
val_loss = validate(model, val_loader, criterion, device)[0]
early_stopping(val_loss, model, 'best_transfer_model.pth')
if early_stopping.early_stop:
print(f"早停于 epoch {epoch}")
break
常见问题
问题 1:过拟合
症状:训练准确率高,验证准确率低
解决方案:
- 增加数据增强
- 使用更强的正则化(Dropout、权重衰减)
- 减少可训练参数
- 使用早停
问题 2:训练不稳定
症状:损失震荡或发散
解决方案:
- 降低学习率
- 使用学习率预热
- 使用梯度裁剪
- 检查数据预处理
问题 3:性能不如预期
症状:迁移学习效果不如从头训练
解决方案:
- 检查数据预处理是否与预训练一致
- 尝试不同的预训练模型
- 调整学习率和微调策略
- 检查目标域与源域的差异
小结
本章我们学习了:
- 迁移学习原理:利用预训练模型的知识
- 微调策略:特征提取、全层微调、渐进式解冻
- 自定义分类器:修改模型结构适应新任务
- 高级技术:知识蒸馏、多任务学习、域适应
- 最佳实践:数据预处理、学习率选择、早停策略