微调实战
本章将通过实际案例,演示如何对预训练模型进行微调以适应特定任务,包括文本分类、问答系统等常见 NLP 任务。
微调概述
微调(Fine-tuning)是将预训练模型适配到特定下游任务的过程:
预训练模型 ──▶ 下游任务数据 ──▶ 微调训练 ──▶ 任务专用模型
│ │
▼ ▼
通用语言知识 任务特定能力
• 语法语义 • 情感分析
• 世界知识 • 问答能力
• 推理能力 • 文本分类
文本分类微调
完整示例:情感分析
import torch
from datasets import load_dataset
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
EarlyStoppingCallback,
)
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
# 1. 配置
MODEL_NAME = "bert-base-chinese"
MAX_LENGTH = 512
BATCH_SIZE = 16
EPOCHS = 5
LEARNING_RATE = 2e-5
# 2. 加载数据(以中文情感分析为例)
# 这里使用示例数据,实际使用时替换为真实数据集
train_texts = [
("这部电影太精彩了!", 1),
("完全浪费时间的烂片", 0),
("演员演技很棒,推荐观看", 1),
("剧情拖沓,看得想睡觉", 0),
# ... 更多数据
]
eval_texts = [
("故事很感人,值得一看", 1),
("特效太差,出戏", 0),
# ... 更多数据
]
# 3. 加载分词器和模型
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME,
num_labels=2 # 二分类:正面/负面
)
# 4. 数据预处理
class SentimentDataset(torch.utils.data.Dataset):
def __init__(self, texts, labels, tokenizer, max_length):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
encoding = self.tokenizer(
text,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].squeeze(),
'attention_mask': encoding['attention_mask'].squeeze(),
'labels': torch.tensor(label, dtype=torch.long)
}
# 准备数据集
train_dataset = SentimentDataset(
[t[0] for t in train_texts],
[t[1] for t in train_texts],
tokenizer,
MAX_LENGTH
)
eval_dataset = SentimentDataset(
[t[0] for t in eval_texts],
[t[1] for t in eval_texts],
tokenizer,
MAX_LENGTH
)
# 5. 评估指标
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
acc = accuracy_score(labels, preds)
f1 = f1_score(labels, preds, average='weighted')
return {
'accuracy': acc,
'f1': f1
}
# 6. 训练参数
training_args = TrainingArguments(
output_dir="./sentiment-model",
num_train_epochs=EPOCHS,
per_device_train_batch_size=BATCH_SIZE,
per_device_eval_batch_size=BATCH_SIZE,
learning_rate=LEARNING_RATE,
weight_decay=0.01,
warmup_ratio=0.1,
logging_steps=10,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
fp16=torch.cuda.is_available(),
)
# 7. 创建 Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)
# 8. 训练
trainer.train()
# 9. 保存最佳模型
trainer.save_model("./sentiment-model-best")
使用 LoRA 进行参数高效微调
from peft import LoraConfig, get_peft_model, TaskType
# 配置 LoRA
lora_config = LoraConfig(
task_type=TaskType.SEQ_CLS, # 序列分类任务
r=16, # LoRA 秩
lora_alpha=32, # LoRA alpha
lora_dropout=0.1, # Dropout
bias="none",
target_modules=["query", "key", "value", "dense"] # 目标模块
)
# 应用 LoRA 到模型
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME,
num_labels=2
)
model = get_peft_model(model, lora_config)
# 打印可训练参数
model.print_trainable_parameters()
# 输出示例: trainable params: 1,235,712 || all params: 108,035,072 || trainable%: 1.1437
# 正常训练
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=compute_metrics,
)
trainer.train()
# 保存 LoRA 权重
model.save_pretrained("./sentiment-lora")
# 加载 LoRA 权重进行推理
from peft import PeftModel
base_model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME)
model = PeftModel.from_pretrained(base_model, "./sentiment-lora")
问答系统微调
抽取式问答
from datasets import load_dataset
from transformers import (
AutoTokenizer,
AutoModelForQuestionAnswering,
TrainingArguments,
Trainer,
default_data_collator,
)
import torch
# 加载 SQuAD 数据集
dataset = load_dataset("squad")
# 加载模型和分词器
model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForQuestionAnswering.from_pretrained(model_name)
# 预处理函数
def prepare_train_features(examples):
# 对问题进行分词
tokenized_examples = tokenizer(
examples["question"],
examples["context"],
truncation="only_second",
max_length=384,
stride=128,
return_overflowing_tokens=True,
return_offsets_mapping=True,
padding="max_length",
)
# 找到答案的起始和结束位置
sample_mapping = tokenized_examples.pop("overflow_to_sample_mapping")
offset_mapping = tokenized_examples.pop("offset_mapping")
tokenized_examples["start_positions"] = []
tokenized_examples["end_positions"] = []
for i, offsets in enumerate(offset_mapping):
input_ids = tokenized_examples["input_ids"][i]
cls_index = input_ids.index(tokenizer.cls_token_id)
sequence_ids = tokenized_examples.sequence_ids(i)
sample_index = sample_mapping[i]
answers = examples["answers"][sample_index]
if len(answers["answer_start"]) == 0:
tokenized_examples["start_positions"].append(cls_index)
tokenized_examples["end_positions"].append(cls_index)
else:
start_char = answers["answer_start"][0]
end_char = start_char + len(answers["text"][0])
token_start_index = 0
while sequence_ids[token_start_index] != 1:
token_start_index += 1
token_end_index = len(input_ids) - 1
while sequence_ids[token_end_index] != 1:
token_end_index -= 1
if not (offsets[token_start_index][0] <= start_char and
offsets[token_end_index][1] >= end_char):
tokenized_examples["start_positions"].append(cls_index)
tokenized_examples["end_positions"].append(cls_index)
else:
while token_start_index < len(offsets) and offsets[token_start_index][0] <= start_char:
token_start_index += 1
tokenized_examples["start_positions"].append(token_start_index - 1)
while offsets[token_end_index][1] >= end_char:
token_end_index -= 1
tokenized_examples["end_positions"].append(token_end_index + 1)
return tokenized_examples
# 应用预处理
tokenized_datasets = dataset.map(
prepare_train_features,
batched=True,
remove_columns=dataset["train"].column_names
)
# 训练参数
training_args = TrainingArguments(
output_dir="./qa-model",
evaluation_strategy="epoch",
learning_rate=3e-5,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=2,
weight_decay=0.01,
fp16=True,
)
# 创建 Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_datasets["train"],
eval_dataset=tokenized_datasets["validation"],
tokenizer=tokenizer,
data_collator=default_data_collator,
)
# 训练
trainer.train()
序列标注微调
命名实体识别 (NER)
from datasets import load_dataset
from transformers import (
AutoTokenizer,
AutoModelForTokenClassification,
TrainingArguments,
Trainer,
DataCollatorForTokenClassification,
)
import numpy as np
# 加载 CoNLL-2003 数据集
dataset = load_dataset("conll2003")
label_list = dataset["train"].features["ner_tags"].feature.names
label2id = {label: i for i, label in enumerate(label_list)}
id2label = {i: label for i, label in enumerate(label_list)}
# 加载模型和分词器
model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(
model_name,
num_labels=len(label_list),
id2label=id2label,
label2id=label2id,
)
# 预处理
def tokenize_and_align_labels(examples):
tokenized_inputs = tokenizer(
examples["tokens"],
truncation=True,
is_split_into_words=True,
)
labels = []
for i, label in enumerate(examples["ner_tags"]):
word_ids = tokenized_inputs.word_ids(batch_index=i)
previous_word_idx = None
label_ids = []
for word_idx in word_ids:
if word_idx is None:
label_ids.append(-100)
elif word_idx != previous_word_idx:
label_ids.append(label[word_idx])
else:
label_ids.append(-100)
previous_word_idx = word_idx
labels.append(label_ids)
tokenized_inputs["labels"] = labels
return tokenized_inputs
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True)
# 数据整理器
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)
# 评估指标
import evaluate
metric = evaluate.load("seqeval")
def compute_metrics(p):
predictions, labels = p
predictions = np.argmax(predictions, axis=2)
true_predictions = [
[label_list[p] for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
true_labels = [
[label_list[l] for (p, l) in zip(prediction, label) if l != -100]
for prediction, label in zip(predictions, labels)
]
results = metric.compute(predictions=true_predictions, references=true_labels)
return {
"precision": results["overall_precision"],
"recall": results["overall_recall"],
"f1": results["overall_f1"],
"accuracy": results["overall_accuracy"],
}
# 训练
training_args = TrainingArguments(
output_dir="./ner-model",
evaluation_strategy="epoch",
learning_rate=5e-5,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=3,
weight_decay=0.01,
fp16=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_dataset["train"],
eval_dataset=tokenized_dataset["validation"],
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
trainer.train()
多任务微调
同时训练多个任务
import torch
from torch.utils.data import DataLoader
from transformers import AutoModel, AutoTokenizer
class MultiTaskModel(torch.nn.Module):
def __init__(self, model_name, num_labels_task_a, num_labels_task_b):
super().__init__()
self.shared_encoder = AutoModel.from_pretrained(model_name)
self.hidden_size = self.shared_encoder.config.hidden_size
# 任务 A:文本分类
self.classifier_a = torch.nn.Linear(self.hidden_size, num_labels_task_a)
# 任务 B:序列标注
self.classifier_b = torch.nn.Linear(self.hidden_size, num_labels_task_b)
def forward(self, input_ids, attention_mask, task="a"):
outputs = self.shared_encoder(input_ids=input_ids, attention_mask=attention_mask)
sequence_output = outputs.last_hidden_state
pooled_output = outputs.pooler_output
if task == "a":
logits = self.classifier_a(pooled_output)
elif task == "b":
logits = self.classifier_b(sequence_output)
else:
raise ValueError(f"Unknown task: {task}")
return logits
# 训练循环
def train_multitask(model, task_a_loader, task_b_loader, optimizer, epochs):
model.train()
for epoch in range(epochs):
# 交替训练两个任务
for batch_a, batch_b in zip(task_a_loader, task_b_loader):
optimizer.zero_grad()
# 任务 A 前向传播
logits_a = model(
input_ids=batch_a["input_ids"],
attention_mask=batch_a["attention_mask"],
task="a"
)
loss_a = torch.nn.CrossEntropyLoss()(logits_a, batch_a["labels"])
# 任务 B 前向传播
logits_b = model(
input_ids=batch_b["input_ids"],
attention_mask=batch_b["attention_mask"],
task="b"
)
loss_b = torch.nn.CrossEntropyLoss()(
logits_b.view(-1, num_labels_task_b),
batch_b["labels"].view(-1)
)
# 总损失
total_loss = loss_a + loss_b
total_loss.backward()
optimizer.step()
微调最佳实践
1. 学习率选择
# 不同层使用不同学习率
# 底层(编码器):较小学习率
# 顶层(分类头):较大学习率
from transformers import AdamW
# 分层学习率
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if "classifier" not in n],
"lr": 1e-5, # 编码器使用较小学习率
},
{
"params": [p for n, p in model.named_parameters() if "classifier" in n],
"lr": 1e-4, # 分类头使用较大学习率
},
]
optimizer = AdamW(optimizer_grouped_parameters)
2. 数据增强
import random
import nltk
from nltk.corpus import wordnet
def synonym_replacement(text, n=2):
"""同义词替换"""
words = text.split()
new_words = words.copy()
for _ in range(n):
word_to_replace = random.choice(words)
synonyms = wordnet.synsets(word_to_replace)
if synonyms:
synonym = synonyms[0].lemmas()[0].name()
new_words = [synonym if w == word_to_replace else w for w in new_words]
return " ".join(new_words)
def back_translation(text, intermediate_lang="fr"):
"""回译增强"""
# 使用翻译模型进行中转
# 英文 -> 法文 -> 英文
pass
# 应用数据增强
augmented_data = []
for text, label in train_data:
augmented_data.append((text, label))
augmented_data.append((synonym_replacement(text), label))
3. 模型集成
import torch
from transformers import AutoModelForSequenceClassification, AutoTokenizer
class EnsembleModel:
def __init__(self, model_paths, weights=None):
self.models = []
self.tokenizers = []
for path in model_paths:
model = AutoModelForSequenceClassification.from_pretrained(path)
tokenizer = AutoTokenizer.from_pretrained(path)
self.models.append(model)
self.tokenizers.append(tokenizer)
self.weights = weights or [1.0 / len(model_paths)] * len(model_paths)
def predict(self, text):
all_logits = []
for model, tokenizer, weight in zip(self.models, self.tokenizers, self.weights):
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits * weight
all_logits.append(logits)
# 加权平均
ensemble_logits = torch.stack(all_logits).sum(dim=0)
predictions = torch.softmax(ensemble_logits, dim=-1)
return predictions
# 使用集成模型
ensemble = EnsembleModel([
"./model-fold-1",
"./model-fold-2",
"./model-fold-3",
])
predictions = ensemble.predict("This is a test sentence.")
4. K 折交叉验证
from sklearn.model_selection import KFold
import numpy as np
def k_fold_training(dataset, n_splits=5):
kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)
fold_results = []
for fold, (train_idx, val_idx) in enumerate(kfold.split(dataset)):
print(f"Training fold {fold + 1}/{n_splits}")
train_subset = dataset.select(train_idx)
val_subset = dataset.select(val_idx)
# 创建新模型
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME,
num_labels=2
)
# 训练参数
training_args = TrainingArguments(
output_dir=f"./results-fold-{fold}",
num_train_epochs=3,
per_device_train_batch_size=16,
learning_rate=2e-5,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_subset,
eval_dataset=val_subset,
compute_metrics=compute_metrics,
)
trainer.train()
results = trainer.evaluate()
fold_results.append(results)
# 保存模型
trainer.save_model(f"./model-fold-{fold}")
# 计算平均结果
avg_results = {
key: np.mean([r[key] for r in fold_results])
for key in fold_results[0].keys()
}
return avg_results
模型评估与测试
详细评估报告
from sklearn.metrics import (
classification_report,
confusion_matrix,
precision_recall_fscore_support
)
import matplotlib.pyplot as plt
import seaborn as sns
def detailed_evaluation(model, test_dataset, tokenizer):
model.eval()
all_preds = []
all_labels = []
for batch in DataLoader(test_dataset, batch_size=32):
with torch.no_grad():
outputs = model(
input_ids=batch["input_ids"],
attention_mask=batch["attention_mask"]
)
preds = outputs.logits.argmax(-1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch["labels"].cpu().numpy())
# 分类报告
print("Classification Report:")
print(classification_report(all_labels, all_preds, target_names=["Negative", "Positive"]))
# 混淆矩阵
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()
return all_preds, all_labels
下一步
完成微调实战后,你可以查看:
- 速查表 - 常用 API 和代码片段