异常检测
异常检测(Anomaly Detection)是机器学习中的重要应用领域,旨在识别与正常数据模式显著不同的观测值。异常检测在欺诈检测、故障诊断、网络入侵检测、医疗诊断等领域有着广泛的应用。本章将介绍 sklearn 中提供的异常检测方法和算法。
异常检测基础概念
什么是异常?
异常(Anomaly),也称为离群点(Outlier),是指与大多数数据显著不同的数据点。异常可能源于数据录入错误、测量误差,也可能是真实但罕见的特殊事件。
异常的例子:
- 信用卡交易中的异常消费模式
- 服务器日志中的异常请求
- 医疗数据中的异常指标
- 工业设备传感器中的异常读数
异常检测 vs 新颖性检测
sklearn 区分了两种相关但不同的任务:
异常检测(Outlier Detection):训练数据本身包含异常值,目标是识别这些异常。这是一个无监督学习问题,因为事先不知道哪些是异常。
新颖性检测(Novelty Detection):训练数据是"干净"的(不包含异常),目标是检测新观测是否为异常。这是一个半监督学习问题,因为训练数据代表正常模式。
| 特性 | 异常检测 | 新颖性检测 |
|---|---|---|
| 训练数据 | 包含异常值 | 不包含异常值 |
| 学习方式 | 无监督 | 半监督 |
| 应用场景 | 数据清洗、异常识别 | 新数据判断 |
| 可用方法 | predict 新数据 | fit_predict 训练数据 |
异常检测的输出
sklearn 中的异常检测器遵循统一的接口:
- 标签预测:
predict()返回 1(正常)或 -1(异常) - 异常分数:
score_samples()返回异常分数,值越小越异常 - 决策函数:
decision_function()返回决策值,负值表示异常
from sklearn.ensemble import IsolationForest
# 创建并训练模型
clf = IsolationForest(random_state=42)
clf.fit(X_train)
# 预测:1 为正常,-1 为异常
predictions = clf.predict(X_test)
# 异常分数:越小越异常
scores = clf.score_samples(X_test)
Isolation Forest(孤立森林)
Isolation Forest 是一种高效的异常检测算法,特别适合高维数据。它基于一个简单而强大的思想:异常点更容易被"孤立"。
算法原理
Isolation Forest 通过随机选择特征和随机选择分割值来构建决策树。异常点通常具有以下特点:
- 异常点数量少
- 异常点的特征值与正常点差异大
这两个特点使得异常点在随机分割过程中更容易被早期隔离,即从根节点到异常点的路径更短。
路径长度的意义:
- 路径短 → 容易被隔离 → 可能是异常
- 路径长 → 不容易被隔离 → 可能是正常
通过构建多棵随机树并平均路径长度,可以得到稳健的异常分数。
基本用法
from sklearn.ensemble import IsolationForest
import numpy as np
# 生成示例数据
rng = np.random.RandomState(42)
X = 0.3 * rng.randn(100, 2)
X_train = np.r_[X + 2, X - 2] # 正常数据
X_outliers = rng.uniform(low=-4, high=4, size=(20, 2)) # 异常数据
# 创建孤立森林模型
clf = IsolationForest(
n_estimators=100, # 树的数量
max_samples='auto', # 每棵树使用的样本数
contamination=0.1, # 异常比例的预期值
random_state=42
)
# 训练模型
clf.fit(X_train)
# 预测
y_pred_train = clf.predict(X_train)
y_pred_outliers = clf.predict(X_outliers)
print(f"训练集异常检测: {np.sum(y_pred_train == -1)} 个异常")
print(f"异常集检测结果: {np.sum(y_pred_outliers == -1)} 个被检测为异常")
重要参数
| 参数 | 说明 | 默认值 |
|---|---|---|
n_estimators | 树的数量 | 100 |
max_samples | 每棵树使用的样本数 | 'auto' |
contamination | 数据集中异常比例的预期值 | 'auto' |
max_features | 每棵树使用的特征数 | 1.0 |
random_state | 随机种子 | None |
contamination 参数:这是一个关键参数,用于确定异常的阈值。如果设置为 'auto',阈值将基于原始论文的建议。如果知道数据集中异常的大致比例,可以设置为具体数值(如 0.1 表示 10% 异常)。
异常分数的解释
# 获取异常分数
scores = clf.score_samples(X_train)
# 分数的含义:
# - 分数越小,越可能是异常
# - 负值表示异常程度高
# - 正值表示正常
# 找出最异常的样本
most_anomalous_idx = np.argmin(scores)
print(f"最异常的样本索引: {most_anomalous_idx}")
print(f"异常分数: {scores[most_anomalous_idx]:.4f}")
增量训练
Isolation Forest 支持 warm_start,可以增量添加树:
# 增量训练
clf = IsolationForest(n_estimators=10, warm_start=True, random_state=42)
clf.fit(X_train) # 训练 10 棵树
# 添加更多树
clf.set_params(n_estimators=20)
clf.fit(X_train) # 再添加 10 棵树
可视化示例
import matplotlib.pyplot as plt
# 训练模型
clf = IsolationForest(contamination=0.1, random_state=42)
clf.fit(X_train)
# 创建网格用于可视化
xx, yy = np.meshgrid(np.linspace(-5, 5, 100), np.linspace(-5, 5, 100))
Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red') # 决策边界
plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='red', s=20, edgecolor='k')
plt.title('Isolation Forest 异常检测')
plt.show()
适用场景
Isolation Forest 特别适合:
- 高维数据
- 大规模数据集
- 异常点分布稀疏的场景
- 不需要对数据分布做假设
Local Outlier Factor(局部离群因子)
Local Outlier Factor(LOF)是一种基于密度的异常检测算法,它通过比较局部密度来识别异常。
算法原理
LOF 的核心思想是:异常点的局部密度显著低于其邻居的局部密度。
关键概念:
- k-距离:点到第 k 近邻居的距离
- 可达距离:max(两点距离, k-距离)
- 局部可达密度:基于可达距离计算的密度
- LOF 分数:邻居的平均局部密度与自身局部密度的比值
LOF 分数的解释:
- LOF ≈ 1:密度与邻居相似,正常点
- LOF > 1:密度低于邻居,可能是异常
- LOF >> 1:密度远低于邻居,很可能是异常
基本用法
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
# 创建 LOF 模型
lof = LocalOutlierFactor(
n_neighbors=20, # 邻居数量
contamination=0.1, # 异常比例预期
novelty=False # False 表示异常检测模式
)
# fit_predict 返回预测标签
y_pred = lof.fit_predict(X_train)
# 获取异常分数(负值表示异常)
# 注意:异常检测模式下,LOF 只有 fit_predict 方法
scores = lof.negative_outlier_factor_
print(f"检测到的异常数量: {np.sum(y_pred == -1)}")
print(f"异常分数范围: [{scores.min():.2f}, {scores.max():.2f}]")
重要参数
| 参数 | 说明 | 默认值 |
|---|---|---|
n_neighbors | 计算局部密度时考虑的邻居数 | 20 |
algorithm | 最近邻搜索算法 | 'auto' |
leaf_size | 树的叶子大小 | 30 |
metric | 距离度量 | 'minkowski' |
contamination | 异常比例预期 | 'auto' |
novelty | 是否用于新颖性检测 | False |
n_neighbors 的选择:
- 通常设置为大于最小簇的大小
- 一般 20 效果良好
- 如果异常比例高(>10%),考虑增大到 35 或更高
异常检测模式 vs 新颖性检测模式
LOF 有两种使用模式:
异常检测模式(novelty=False):
# 异常检测:训练数据包含异常
lof = LocalOutlierFactor(novelty=False)
y_pred = lof.fit_predict(X_train) # 只能使用 fit_predict
scores = lof.negative_outlier_factor_ # 访问异常分数
新颖性检测模式(novelty=True):
# 新颖性检测:训练数据干净,检测新数据
lof = LocalOutlierFactor(novelty=True)
lof.fit(X_train) # 训练
# 可以预测新数据
y_pred_new = lof.predict(X_new)
scores_new = lof.score_samples(X_new)
新颖性检测模式下,不要对训练数据使用 predict、decision_function 或 score_samples,这会导致错误结果。
LOF 的优势
LOF 算法能够处理不同密度的区域:
- 在稠密区域的异常点,其邻居的密度也高,但相对密度低
- 在稀疏区域的正常点,虽然绝对密度低,但与邻居密度相近
这使得 LOF 能够适应复杂的数据分布。
One-Class SVM(单类支持向量机)
One-Class SVM 是支持向量机的一种变体,专门用于新颖性检测。它学习正常数据的边界,将新数据分类为正常或异常。
算法原理
One-Class SVM 的目标是找到一个超平面,将所有正常数据包围在一个紧凑的区域内。它通过以下方式实现:
- 将数据映射到高维特征空间
- 找到一个超平面,使正常数据远离原点
- 新数据根据其与超平面的关系判断是否异常
数学表达:
其中 控制异常比例的上界, 是松弛变量。
基本用法
from sklearn.svm import OneClassSVM
# 创建 One-Class SVM
ocsvm = OneClassSVM(
kernel='rbf', # 核函数
gamma='scale', # 核参数
nu=0.05 # 异常比例上界
)
# 训练(假设训练数据是干净的)
ocsvm.fit(X_train)
# 预测新数据
y_pred = ocsvm.predict(X_test)
scores = ocsvm.score_samples(X_test)
print(f"预测结果: {np.unique(y_pred, return_counts=True)}")
重要参数
| 参数 | 说明 | 默认值 |
|---|---|---|
kernel | 核函数类型 | 'rbf' |
gamma | RBF、poly、sigmoid 核的系数 | 'scale' |
nu | 训练误差比例的上界 | 0.5 |
degree | 多项式核的次数 | 3 |
nu 参数:
- 取值范围 (0, 1]
- 控制训练误差比例的上界
- 也是支持向量比例的下界
- 通常设置为预期异常比例
gamma 参数:
'scale':1 / (n_features * X.var())'auto':1 / n_features- 也可以设置为具体数值
线性 One-Class SVM
对于大规模数据,可以使用线性版本的 One-Class SVM:
from sklearn.linear_model import SGDOneClassSVM
# 线性 One-Class SVM,适合大规模数据
sgd_ocsvm = SGDOneClassSVM(
nu=0.05,
tol=1e-4,
max_iter=1000
)
sgd_ocsvm.fit(X_train)
y_pred = sgd_ocsvm.predict(X_test)
线性版本的时间复杂度为 O(n),适合大规模数据集。
核近似
结合核近似技术,线性 SVM 可以模拟非线性核:
from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import SGDOneClassSVM
from sklearn.pipeline import make_pipeline
# 使用 Nystroem 核近似
feature_map_nystroem = Nystroem(
gamma=0.2,
random_state=42,
n_components=X_train.shape[1] # 特征数
)
# 构建管道
pipeline = make_pipeline(
feature_map_nystroem,
SGDOneClassSVM(nu=0.05)
)
pipeline.fit(X_train)
y_pred = pipeline.predict(X_test)
One-Class SVM 的局限性
One-Class SVM 对异常值敏感。如果训练数据包含异常,模型可能会过拟合。对于异常检测(训练数据包含异常),建议使用 Isolation Forest 或 LOF。
Elliptic Envelope(椭圆包络)
Elliptic Envelope 假设正常数据服从高斯分布,通过拟合椭圆来识别异常。
算法原理
Elliptic Envelope 使用鲁棒协方差估计来拟合数据的形状:
- 假设正常数据服从多变量高斯分布
- 使用最小协方差行列式(MCD)估计位置和协方差
- 计算马氏距离判断异常
马氏距离:
其中 是均值, 是协方差矩阵。
马氏距离考虑了特征之间的相关性和不同尺度,比欧氏距离更适合多维数据。
基本用法
from sklearn.covariance import EllipticEnvelope
# 创建椭圆包络模型
ee = EllipticEnvelope(
contamination=0.1, # 异常比例
random_state=42
)
# 训练
ee.fit(X_train)
# 预测
y_pred = ee.predict(X_test)
# 马氏距离
mahal_dist = ee.mahalanobis(X_test)
print(f"检测到的异常数量: {np.sum(y_pred == -1)}")
重要参数
| 参数 | 说明 | 默认值 |
|---|---|---|
store_precision | 是否存储精度矩阵 | True |
assume_centered | 是否假设数据中心化 | False |
support_fraction | MCD 中使用的样本比例 | None |
contamination | 异常比例 | 0.1 |
可视化示例
import matplotlib.pyplot as plt
# 训练模型
ee = EllipticEnvelope(contamination=0.1, random_state=42)
ee.fit(X_train)
# 创建网格
xx, yy = np.meshgrid(np.linspace(-5, 5, 100), np.linspace(-5, 5, 100))
Z = ee.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
plt.title('Elliptic Envelope 异常检测')
plt.show()
适用场景
Elliptic Envelope 适合:
- 数据近似服从高斯分布
- 特征之间存在相关性
- 数据维度不太高
局限性:
- 对非高斯分布效果差
- 高维数据可能不适用
- 对非线性边界效果差
算法比较与选择
性能对比
不同算法在不同数据集上的表现:
| 算法 | 时间复杂度 | 适合高维 | 适合大数据 | 需要干净训练数据 |
|---|---|---|---|---|
| Isolation Forest | O(n log n) | 是 | 是 | 否 |
| LOF | O(n²) 或 O(n log n) | 一般 | 一般 | 否/是(可配置) |
| One-Class SVM | O(n²) 到 O(n³) | 一般 | 否 | 是 |
| Elliptic Envelope | O(n) | 否 | 是 | 否 |
算法选择指南
选择 Isolation Forest 当:
- 数据维度高
- 数据集大
- 不确定数据分布
- 训练数据可能包含异常
选择 LOF 当:
- 数据有不同密度的区域
- 异常在局部密度上有显著差异
- 需要考虑局部结构
选择 One-Class SVM 当:
- 训练数据干净(无异常)
- 需要非线性边界
- 数据量适中
选择 Elliptic Envelope 当:
- 数据近似高斯分布
- 特征相关性重要
- 需要鲁棒的统计方法
综合示例
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
# 生成数据
X_train, _ = make_blobs(n_samples=200, centers=1, random_state=42)
X_train = np.vstack([X_train, np.random.uniform(low=-4, high=4, size=(20, 2))])
# 定义模型
models = {
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'LOF': LocalOutlierFactor(n_neighbors=20, contamination=0.1, novelty=True),
'One-Class SVM': OneClassSVM(nu=0.1),
'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
}
# 训练和可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
for ax, (name, model) in zip(axes.ravel(), models.items()):
model.fit(X_train)
# 创建网格
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
Z = model.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
# 绘制
ax.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
ax.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
ax.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
ax.set_title(name)
ax.set_xlim(-6, 6)
ax.set_ylim(-6, 6)
plt.tight_layout()
plt.show()
实际应用案例
欺诈检测示例
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# 假设有一个信用卡交易数据集
# 实际应用中,需要根据业务特征进行特征工程
# 创建示例数据
np.random.seed(42)
n_samples = 1000
# 正常交易
normal_transactions = pd.DataFrame({
'amount': np.random.exponential(100, n_samples * 0.9),
'time_hour': np.random.randint(0, 24, n_samples * 0.9),
'merchant_category': np.random.randint(1, 10, n_samples * 0.9)
})
# 异常交易(欺诈)
fraud_transactions = pd.DataFrame({
'amount': np.random.exponential(1000, n_samples * 0.1),
'time_hour': np.random.randint(0, 24, n_samples * 0.1),
'merchant_category': np.random.randint(1, 10, n_samples * 0.1)
})
# 合并数据
data = pd.concat([normal_transactions, fraud_transactions], ignore_index=True)
# 标准化
scaler = StandardScaler()
X = scaler.fit_transform(data)
# 训练异常检测模型
clf = IsolationForest(contamination=0.1, random_state=42)
clf.fit(X)
# 预测
predictions = clf.predict(X)
data['is_fraud'] = predictions == -1
# 评估
print(f"检测到的欺诈交易数量: {data['is_fraud'].sum()}")
print(f"欺诈交易的平均金额: {data[data['is_fraud']]['amount'].mean():.2f}")
print(f"正常交易的平均金额: {data[~data['is_fraud']]['amount'].mean():.2f}")
设备故障预测
from sklearn.neighbors import LocalOutlierFactor
# 假设有设备传感器数据
# 正常运行时的传感器读数
normal_data = np.random.randn(1000, 3) * [10, 5, 2] + [50, 25, 100]
# 使用 LOF 检测异常
lof = LocalOutlierFactor(n_neighbors=35, contamination=0.05)
predictions = lof.fit_predict(normal_data)
# 找出异常读数
anomalies = normal_data[predictions == -1]
print(f"检测到 {len(anomalies)} 个异常读数")
# 监控新数据
new_data = np.array([[55, 30, 95], [80, 10, 150]]) # 新传感器读数
# 使用新颖性检测模式
lof_novelty = LocalOutlierFactor(n_neighbors=35, novelty=True)
lof_novelty.fit(normal_data)
new_predictions = lof_novelty.predict(new_data)
for i, pred in enumerate(new_predictions):
status = "异常" if pred == -1 else "正常"
print(f"传感器读数 {new_data[i]}: {status}")
最佳实践
数据预处理
异常检测对数据尺度敏感,建议进行标准化:
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
# 使用管道确保预处理一致性
pipeline = Pipeline([
('scaler', StandardScaler()),
('detector', IsolationForest(contamination=0.1, random_state=42))
])
pipeline.fit(X_train)
predictions = pipeline.predict(X_test)
contamination 参数的设定
如果不知道数据中异常的真实比例,可以:
- 从保守估计开始(如 0.05)
- 使用交叉验证评估
- 结合业务知识调整
# 网格搜索不同的 contamination 值
contaminations = [0.01, 0.05, 0.1, 0.15, 0.2]
for cont in contaminations:
clf = IsolationForest(contamination=cont, random_state=42)
predictions = clf.fit_predict(X)
n_outliers = (predictions == -1).sum()
print(f"contamination={cont}: 检测到 {n_outliers} 个异常")
结合多种方法
对于关键应用,可以结合多种方法提高可靠性:
from scipy import stats
# 多种方法的集成
iso_forest = IsolationForest(contamination=0.1, random_state=42)
lof = LocalOutlierFactor(n_neighbors=20, novelty=True, contamination=0.1)
iso_forest.fit(X_train)
lof.fit(X_train)
# 获取异常分数
iso_scores = iso_forest.score_samples(X_test)
lof_scores = lof.score_samples(X_test)
# 标准化分数
iso_scores_norm = stats.zscore(iso_scores)
lof_scores_norm = stats.zscore(lof_scores)
# 平均分数(越低越异常)
combined_scores = (iso_scores_norm + lof_scores_norm) / 2
# 使用阈值判断
threshold = -1.5 # 可根据需求调整
final_predictions = np.where(combined_scores < threshold, -1, 1)
小结
异常检测是机器学习的重要应用领域:
- 区分任务类型:异常检测 vs 新颖性检测
- 选择合适算法:根据数据特点选择 Isolation Forest、LOF、One-Class SVM 或 Elliptic Envelope
- 参数调优:特别是 contamination 参数
- 数据预处理:标准化是常见步骤
- 结合业务:异常检测的结果需要结合领域知识解释
异常检测在欺诈检测、故障诊断、入侵检测等领域有广泛应用。掌握这些方法是构建可靠异常检测系统的基础。