跳到主要内容

异常检测

异常检测(Anomaly Detection)是机器学习中的重要应用领域,旨在识别与正常数据模式显著不同的观测值。异常检测在欺诈检测、故障诊断、网络入侵检测、医疗诊断等领域有着广泛的应用。本章将介绍 sklearn 中提供的异常检测方法和算法。

异常检测基础概念

什么是异常?

异常(Anomaly),也称为离群点(Outlier),是指与大多数数据显著不同的数据点。异常可能源于数据录入错误、测量误差,也可能是真实但罕见的特殊事件。

异常的例子

  • 信用卡交易中的异常消费模式
  • 服务器日志中的异常请求
  • 医疗数据中的异常指标
  • 工业设备传感器中的异常读数

异常检测 vs 新颖性检测

sklearn 区分了两种相关但不同的任务:

异常检测(Outlier Detection):训练数据本身包含异常值,目标是识别这些异常。这是一个无监督学习问题,因为事先不知道哪些是异常。

新颖性检测(Novelty Detection):训练数据是"干净"的(不包含异常),目标是检测新观测是否为异常。这是一个半监督学习问题,因为训练数据代表正常模式。

特性异常检测新颖性检测
训练数据包含异常值不包含异常值
学习方式无监督半监督
应用场景数据清洗、异常识别新数据判断
可用方法predict 新数据fit_predict 训练数据

异常检测的输出

sklearn 中的异常检测器遵循统一的接口:

  • 标签预测predict() 返回 1(正常)或 -1(异常)
  • 异常分数score_samples() 返回异常分数,值越小越异常
  • 决策函数decision_function() 返回决策值,负值表示异常
from sklearn.ensemble import IsolationForest

# 创建并训练模型
clf = IsolationForest(random_state=42)
clf.fit(X_train)

# 预测:1 为正常,-1 为异常
predictions = clf.predict(X_test)

# 异常分数:越小越异常
scores = clf.score_samples(X_test)

Isolation Forest(孤立森林)

Isolation Forest 是一种高效的异常检测算法,特别适合高维数据。它基于一个简单而强大的思想:异常点更容易被"孤立"。

算法原理

Isolation Forest 通过随机选择特征和随机选择分割值来构建决策树。异常点通常具有以下特点:

  1. 异常点数量少
  2. 异常点的特征值与正常点差异大

这两个特点使得异常点在随机分割过程中更容易被早期隔离,即从根节点到异常点的路径更短。

路径长度的意义

  • 路径短 → 容易被隔离 → 可能是异常
  • 路径长 → 不容易被隔离 → 可能是正常

通过构建多棵随机树并平均路径长度,可以得到稳健的异常分数。

基本用法

from sklearn.ensemble import IsolationForest
import numpy as np

# 生成示例数据
rng = np.random.RandomState(42)
X = 0.3 * rng.randn(100, 2)
X_train = np.r_[X + 2, X - 2] # 正常数据
X_outliers = rng.uniform(low=-4, high=4, size=(20, 2)) # 异常数据

# 创建孤立森林模型
clf = IsolationForest(
n_estimators=100, # 树的数量
max_samples='auto', # 每棵树使用的样本数
contamination=0.1, # 异常比例的预期值
random_state=42
)

# 训练模型
clf.fit(X_train)

# 预测
y_pred_train = clf.predict(X_train)
y_pred_outliers = clf.predict(X_outliers)

print(f"训练集异常检测: {np.sum(y_pred_train == -1)} 个异常")
print(f"异常集检测结果: {np.sum(y_pred_outliers == -1)} 个被检测为异常")

重要参数

参数说明默认值
n_estimators树的数量100
max_samples每棵树使用的样本数'auto'
contamination数据集中异常比例的预期值'auto'
max_features每棵树使用的特征数1.0
random_state随机种子None

contamination 参数:这是一个关键参数,用于确定异常的阈值。如果设置为 'auto',阈值将基于原始论文的建议。如果知道数据集中异常的大致比例,可以设置为具体数值(如 0.1 表示 10% 异常)。

异常分数的解释

# 获取异常分数
scores = clf.score_samples(X_train)

# 分数的含义:
# - 分数越小,越可能是异常
# - 负值表示异常程度高
# - 正值表示正常

# 找出最异常的样本
most_anomalous_idx = np.argmin(scores)
print(f"最异常的样本索引: {most_anomalous_idx}")
print(f"异常分数: {scores[most_anomalous_idx]:.4f}")

增量训练

Isolation Forest 支持 warm_start,可以增量添加树:

# 增量训练
clf = IsolationForest(n_estimators=10, warm_start=True, random_state=42)
clf.fit(X_train) # 训练 10 棵树

# 添加更多树
clf.set_params(n_estimators=20)
clf.fit(X_train) # 再添加 10 棵树

可视化示例

import matplotlib.pyplot as plt

# 训练模型
clf = IsolationForest(contamination=0.1, random_state=42)
clf.fit(X_train)

# 创建网格用于可视化
xx, yy = np.meshgrid(np.linspace(-5, 5, 100), np.linspace(-5, 5, 100))
Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

# 绘制
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red') # 决策边界
plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='red', s=20, edgecolor='k')
plt.title('Isolation Forest 异常检测')
plt.show()

适用场景

Isolation Forest 特别适合:

  • 高维数据
  • 大规模数据集
  • 异常点分布稀疏的场景
  • 不需要对数据分布做假设

Local Outlier Factor(局部离群因子)

Local Outlier Factor(LOF)是一种基于密度的异常检测算法,它通过比较局部密度来识别异常。

算法原理

LOF 的核心思想是:异常点的局部密度显著低于其邻居的局部密度。

关键概念

  1. k-距离:点到第 k 近邻居的距离
  2. 可达距离:max(两点距离, k-距离)
  3. 局部可达密度:基于可达距离计算的密度
  4. LOF 分数:邻居的平均局部密度与自身局部密度的比值

LOF 分数的解释

  • LOF ≈ 1:密度与邻居相似,正常点
  • LOF > 1:密度低于邻居,可能是异常
  • LOF >> 1:密度远低于邻居,很可能是异常

基本用法

from sklearn.neighbors import LocalOutlierFactor
import numpy as np

# 创建 LOF 模型
lof = LocalOutlierFactor(
n_neighbors=20, # 邻居数量
contamination=0.1, # 异常比例预期
novelty=False # False 表示异常检测模式
)

# fit_predict 返回预测标签
y_pred = lof.fit_predict(X_train)

# 获取异常分数(负值表示异常)
# 注意:异常检测模式下,LOF 只有 fit_predict 方法
scores = lof.negative_outlier_factor_

print(f"检测到的异常数量: {np.sum(y_pred == -1)}")
print(f"异常分数范围: [{scores.min():.2f}, {scores.max():.2f}]")

重要参数

参数说明默认值
n_neighbors计算局部密度时考虑的邻居数20
algorithm最近邻搜索算法'auto'
leaf_size树的叶子大小30
metric距离度量'minkowski'
contamination异常比例预期'auto'
novelty是否用于新颖性检测False

n_neighbors 的选择

  • 通常设置为大于最小簇的大小
  • 一般 20 效果良好
  • 如果异常比例高(>10%),考虑增大到 35 或更高

异常检测模式 vs 新颖性检测模式

LOF 有两种使用模式:

异常检测模式(novelty=False)

# 异常检测:训练数据包含异常
lof = LocalOutlierFactor(novelty=False)
y_pred = lof.fit_predict(X_train) # 只能使用 fit_predict
scores = lof.negative_outlier_factor_ # 访问异常分数

新颖性检测模式(novelty=True)

# 新颖性检测:训练数据干净,检测新数据
lof = LocalOutlierFactor(novelty=True)
lof.fit(X_train) # 训练

# 可以预测新数据
y_pred_new = lof.predict(X_new)
scores_new = lof.score_samples(X_new)
警告

新颖性检测模式下,不要对训练数据使用 predictdecision_functionscore_samples,这会导致错误结果。

LOF 的优势

LOF 算法能够处理不同密度的区域:

  • 在稠密区域的异常点,其邻居的密度也高,但相对密度低
  • 在稀疏区域的正常点,虽然绝对密度低,但与邻居密度相近

这使得 LOF 能够适应复杂的数据分布。

One-Class SVM(单类支持向量机)

One-Class SVM 是支持向量机的一种变体,专门用于新颖性检测。它学习正常数据的边界,将新数据分类为正常或异常。

算法原理

One-Class SVM 的目标是找到一个超平面,将所有正常数据包围在一个紧凑的区域内。它通过以下方式实现:

  1. 将数据映射到高维特征空间
  2. 找到一个超平面,使正常数据远离原点
  3. 新数据根据其与超平面的关系判断是否异常

数学表达

minw,ρ,ξ12w2+1νniξiρ\min_{w, \rho, \xi} \frac{1}{2}\|w\|^2 + \frac{1}{\nu n}\sum_{i}\xi_i - \rho

其中 ν\nu 控制异常比例的上界,ξi\xi_i 是松弛变量。

基本用法

from sklearn.svm import OneClassSVM

# 创建 One-Class SVM
ocsvm = OneClassSVM(
kernel='rbf', # 核函数
gamma='scale', # 核参数
nu=0.05 # 异常比例上界
)

# 训练(假设训练数据是干净的)
ocsvm.fit(X_train)

# 预测新数据
y_pred = ocsvm.predict(X_test)
scores = ocsvm.score_samples(X_test)

print(f"预测结果: {np.unique(y_pred, return_counts=True)}")

重要参数

参数说明默认值
kernel核函数类型'rbf'
gammaRBF、poly、sigmoid 核的系数'scale'
nu训练误差比例的上界0.5
degree多项式核的次数3

nu 参数

  • 取值范围 (0, 1]
  • 控制训练误差比例的上界
  • 也是支持向量比例的下界
  • 通常设置为预期异常比例

gamma 参数

  • 'scale':1 / (n_features * X.var())
  • 'auto':1 / n_features
  • 也可以设置为具体数值

线性 One-Class SVM

对于大规模数据,可以使用线性版本的 One-Class SVM:

from sklearn.linear_model import SGDOneClassSVM

# 线性 One-Class SVM,适合大规模数据
sgd_ocsvm = SGDOneClassSVM(
nu=0.05,
tol=1e-4,
max_iter=1000
)

sgd_ocsvm.fit(X_train)
y_pred = sgd_ocsvm.predict(X_test)

线性版本的时间复杂度为 O(n),适合大规模数据集。

核近似

结合核近似技术,线性 SVM 可以模拟非线性核:

from sklearn.kernel_approximation import Nystroem
from sklearn.linear_model import SGDOneClassSVM
from sklearn.pipeline import make_pipeline

# 使用 Nystroem 核近似
feature_map_nystroem = Nystroem(
gamma=0.2,
random_state=42,
n_components=X_train.shape[1] # 特征数
)

# 构建管道
pipeline = make_pipeline(
feature_map_nystroem,
SGDOneClassSVM(nu=0.05)
)

pipeline.fit(X_train)
y_pred = pipeline.predict(X_test)

One-Class SVM 的局限性

One-Class SVM 对异常值敏感。如果训练数据包含异常,模型可能会过拟合。对于异常检测(训练数据包含异常),建议使用 Isolation Forest 或 LOF。

Elliptic Envelope(椭圆包络)

Elliptic Envelope 假设正常数据服从高斯分布,通过拟合椭圆来识别异常。

算法原理

Elliptic Envelope 使用鲁棒协方差估计来拟合数据的形状:

  1. 假设正常数据服从多变量高斯分布
  2. 使用最小协方差行列式(MCD)估计位置和协方差
  3. 计算马氏距离判断异常

马氏距离

dM(x)=(xμ)TΣ1(xμ)d_M(x) = \sqrt{(x - \mu)^T \Sigma^{-1} (x - \mu)}

其中 μ\mu 是均值,Σ\Sigma 是协方差矩阵。

马氏距离考虑了特征之间的相关性和不同尺度,比欧氏距离更适合多维数据。

基本用法

from sklearn.covariance import EllipticEnvelope

# 创建椭圆包络模型
ee = EllipticEnvelope(
contamination=0.1, # 异常比例
random_state=42
)

# 训练
ee.fit(X_train)

# 预测
y_pred = ee.predict(X_test)

# 马氏距离
mahal_dist = ee.mahalanobis(X_test)

print(f"检测到的异常数量: {np.sum(y_pred == -1)}")

重要参数

参数说明默认值
store_precision是否存储精度矩阵True
assume_centered是否假设数据中心化False
support_fractionMCD 中使用的样本比例None
contamination异常比例0.1

可视化示例

import matplotlib.pyplot as plt

# 训练模型
ee = EllipticEnvelope(contamination=0.1, random_state=42)
ee.fit(X_train)

# 创建网格
xx, yy = np.meshgrid(np.linspace(-5, 5, 100), np.linspace(-5, 5, 100))
Z = ee.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

# 绘制
plt.figure(figsize=(10, 8))
plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
plt.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
plt.title('Elliptic Envelope 异常检测')
plt.show()

适用场景

Elliptic Envelope 适合:

  • 数据近似服从高斯分布
  • 特征之间存在相关性
  • 数据维度不太高

局限性

  • 对非高斯分布效果差
  • 高维数据可能不适用
  • 对非线性边界效果差

算法比较与选择

性能对比

不同算法在不同数据集上的表现:

算法时间复杂度适合高维适合大数据需要干净训练数据
Isolation ForestO(n log n)
LOFO(n²) 或 O(n log n)一般一般否/是(可配置)
One-Class SVMO(n²) 到 O(n³)一般
Elliptic EnvelopeO(n)

算法选择指南

选择 Isolation Forest 当

  • 数据维度高
  • 数据集大
  • 不确定数据分布
  • 训练数据可能包含异常

选择 LOF 当

  • 数据有不同密度的区域
  • 异常在局部密度上有显著差异
  • 需要考虑局部结构

选择 One-Class SVM 当

  • 训练数据干净(无异常)
  • 需要非线性边界
  • 数据量适中

选择 Elliptic Envelope 当

  • 数据近似高斯分布
  • 特征相关性重要
  • 需要鲁棒的统计方法

综合示例

import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.svm import OneClassSVM
from sklearn.covariance import EllipticEnvelope
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt

# 生成数据
X_train, _ = make_blobs(n_samples=200, centers=1, random_state=42)
X_train = np.vstack([X_train, np.random.uniform(low=-4, high=4, size=(20, 2))])

# 定义模型
models = {
'Isolation Forest': IsolationForest(contamination=0.1, random_state=42),
'LOF': LocalOutlierFactor(n_neighbors=20, contamination=0.1, novelty=True),
'One-Class SVM': OneClassSVM(nu=0.1),
'Elliptic Envelope': EllipticEnvelope(contamination=0.1, random_state=42)
}

# 训练和可视化
fig, axes = plt.subplots(2, 2, figsize=(14, 12))

for ax, (name, model) in zip(axes.ravel(), models.items()):
model.fit(X_train)

# 创建网格
xx, yy = np.meshgrid(np.linspace(-6, 6, 100), np.linspace(-6, 6, 100))
Z = model.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

# 绘制
ax.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap='Blues_r')
ax.contour(xx, yy, Z, levels=[0], linewidths=2, colors='red')
ax.scatter(X_train[:, 0], X_train[:, 1], c='white', s=20, edgecolor='k')
ax.set_title(name)
ax.set_xlim(-6, 6)
ax.set_ylim(-6, 6)

plt.tight_layout()
plt.show()

实际应用案例

欺诈检测示例

import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# 假设有一个信用卡交易数据集
# 实际应用中,需要根据业务特征进行特征工程

# 创建示例数据
np.random.seed(42)
n_samples = 1000

# 正常交易
normal_transactions = pd.DataFrame({
'amount': np.random.exponential(100, n_samples * 0.9),
'time_hour': np.random.randint(0, 24, n_samples * 0.9),
'merchant_category': np.random.randint(1, 10, n_samples * 0.9)
})

# 异常交易(欺诈)
fraud_transactions = pd.DataFrame({
'amount': np.random.exponential(1000, n_samples * 0.1),
'time_hour': np.random.randint(0, 24, n_samples * 0.1),
'merchant_category': np.random.randint(1, 10, n_samples * 0.1)
})

# 合并数据
data = pd.concat([normal_transactions, fraud_transactions], ignore_index=True)

# 标准化
scaler = StandardScaler()
X = scaler.fit_transform(data)

# 训练异常检测模型
clf = IsolationForest(contamination=0.1, random_state=42)
clf.fit(X)

# 预测
predictions = clf.predict(X)
data['is_fraud'] = predictions == -1

# 评估
print(f"检测到的欺诈交易数量: {data['is_fraud'].sum()}")
print(f"欺诈交易的平均金额: {data[data['is_fraud']]['amount'].mean():.2f}")
print(f"正常交易的平均金额: {data[~data['is_fraud']]['amount'].mean():.2f}")

设备故障预测

from sklearn.neighbors import LocalOutlierFactor

# 假设有设备传感器数据
# 正常运行时的传感器读数
normal_data = np.random.randn(1000, 3) * [10, 5, 2] + [50, 25, 100]

# 使用 LOF 检测异常
lof = LocalOutlierFactor(n_neighbors=35, contamination=0.05)
predictions = lof.fit_predict(normal_data)

# 找出异常读数
anomalies = normal_data[predictions == -1]
print(f"检测到 {len(anomalies)} 个异常读数")

# 监控新数据
new_data = np.array([[55, 30, 95], [80, 10, 150]]) # 新传感器读数

# 使用新颖性检测模式
lof_novelty = LocalOutlierFactor(n_neighbors=35, novelty=True)
lof_novelty.fit(normal_data)
new_predictions = lof_novelty.predict(new_data)

for i, pred in enumerate(new_predictions):
status = "异常" if pred == -1 else "正常"
print(f"传感器读数 {new_data[i]}: {status}")

最佳实践

数据预处理

异常检测对数据尺度敏感,建议进行标准化:

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

# 使用管道确保预处理一致性
pipeline = Pipeline([
('scaler', StandardScaler()),
('detector', IsolationForest(contamination=0.1, random_state=42))
])

pipeline.fit(X_train)
predictions = pipeline.predict(X_test)

contamination 参数的设定

如果不知道数据中异常的真实比例,可以:

  1. 从保守估计开始(如 0.05)
  2. 使用交叉验证评估
  3. 结合业务知识调整
# 网格搜索不同的 contamination 值
contaminations = [0.01, 0.05, 0.1, 0.15, 0.2]

for cont in contaminations:
clf = IsolationForest(contamination=cont, random_state=42)
predictions = clf.fit_predict(X)
n_outliers = (predictions == -1).sum()
print(f"contamination={cont}: 检测到 {n_outliers} 个异常")

结合多种方法

对于关键应用,可以结合多种方法提高可靠性:

from scipy import stats

# 多种方法的集成
iso_forest = IsolationForest(contamination=0.1, random_state=42)
lof = LocalOutlierFactor(n_neighbors=20, novelty=True, contamination=0.1)

iso_forest.fit(X_train)
lof.fit(X_train)

# 获取异常分数
iso_scores = iso_forest.score_samples(X_test)
lof_scores = lof.score_samples(X_test)

# 标准化分数
iso_scores_norm = stats.zscore(iso_scores)
lof_scores_norm = stats.zscore(lof_scores)

# 平均分数(越低越异常)
combined_scores = (iso_scores_norm + lof_scores_norm) / 2

# 使用阈值判断
threshold = -1.5 # 可根据需求调整
final_predictions = np.where(combined_scores < threshold, -1, 1)

小结

异常检测是机器学习的重要应用领域:

  1. 区分任务类型:异常检测 vs 新颖性检测
  2. 选择合适算法:根据数据特点选择 Isolation Forest、LOF、One-Class SVM 或 Elliptic Envelope
  3. 参数调优:特别是 contamination 参数
  4. 数据预处理:标准化是常见步骤
  5. 结合业务:异常检测的结果需要结合领域知识解释

异常检测在欺诈检测、故障诊断、入侵检测等领域有广泛应用。掌握这些方法是构建可靠异常检测系统的基础。

参考资料