跳到主要内容

常见安全攻击与防御

认证系统是攻击者的首要目标。了解常见攻击手法及其防御策略,是构建安全系统的基础。本章深入分析认证相关的安全威胁,并提供具体的防御实现。

攻击分类概览

凭据攻击

暴力破解

攻击者尝试所有可能的密码组合,直到找到正确的密码。

攻击特征

  • 大量来自同一 IP 或少数 IP 的登录请求
  • 尝试常见密码(如 "password123", "123456")
  • 针对同一账户的密集尝试

防御措施

1. 账户锁定策略

from datetime import datetime, timedelta
from flask import request

class AccountLockout:
def __init__(self, redis_client):
self.redis = redis_client
self.max_attempts = 5
self.lockout_duration = 900 # 15 分钟
self.observation_window = 300 # 5 分钟观察窗口

def record_failed_attempt(self, user_id: str):
"""记录失败尝试"""
key = f"lockout:attempts:{user_id}"
attempts = self.redis.incr(key)

if attempts == 1:
self.redis.expire(key, self.observation_window)

if attempts >= self.max_attempts:
self._lock_account(user_id)

return attempts

def _lock_account(self, user_id: str):
"""锁定账户"""
key = f"lockout:locked:{user_id}"
self.redis.setex(key, self.lockout_duration, "1")

def is_locked(self, user_id: str) -> bool:
"""检查账户是否锁定"""
key = f"lockout:locked:{user_id}"
return self.redis.exists(key)

def get_remaining_lockout_time(self, user_id: str) -> int:
"""获取剩余锁定时间"""
key = f"lockout:locked:{user_id}"
return self.redis.ttl(key)

def reset_attempts(self, user_id: str):
"""登录成功后重置尝试次数"""
key = f"lockout:attempts:{user_id}"
self.redis.delete(key)


# 使用示例
@app.route('/login', methods=['POST'])
def login():
user_id = get_user_id_by_email(request.form['email'])

lockout = AccountLockout(redis_client)

# 检查是否锁定
if lockout.is_locked(user_id):
remaining = lockout.get_remaining_lockout_time(user_id)
return jsonify({
'error': f'账户已锁定,请 {remaining} 秒后重试'
}), 429

# 验证凭据
if verify_credentials(request.form['email'], request.form['password']):
lockout.reset_attempts(user_id)
return jsonify({'token': generate_token(user_id)})
else:
attempts = lockout.record_failed_attempt(user_id)
remaining = AccountLockout(redis_client).max_attempts - attempts
return jsonify({
'error': '邮箱或密码错误',
'remaining_attempts': max(0, remaining)
}), 401

2. 指数退避

比固定锁定时间更友好,随失败次数增加等待时间:

// Node.js - 指数退避实现
class ExponentialBackoff {
constructor(redis) {
this.redis = redis;
this.baseDelay = 1; // 初始延迟 1 秒
this.maxDelay = 3600; // 最大延迟 1 小时
this.multiplier = 2; // 倍数
}

async recordFailedAttempt(userId) {
const key = `backoff:attempts:${userId}`;
const attempts = await this.redis.incr(key);

if (attempts === 1) {
await this.redis.expire(key, 86400); // 24 小时过期
}

// 计算延迟:baseDelay * (multiplier ^ (attempts - 1))
const delay = Math.min(
this.baseDelay * Math.pow(this.multiplier, attempts - 1),
this.maxDelay
);

// 设置锁定
const lockKey = `backoff:locked:${userId}`;
await this.redis.setex(lockKey, delay, attempts.toString());

return { attempts, delay };
}

async getBackoffStatus(userId) {
const lockKey = `backoff:locked:${userId}`;
const remaining = await this.redis.ttl(lockKey);

if (remaining > 0) {
return {
isLocked: true,
remainingSeconds: remaining,
};
}

return { isLocked: false };
}
}

3. CAPTCHA

在多次失败后要求完成 CAPTCHA:

# Python - 集成 CAPTCHA
import requests

def verify_recaptcha(token: str, remote_ip: str) -> bool:
"""验证 reCAPTCHA v3"""
response = requests.post(
'https://www.google.com/recaptcha/api/siteverify',
data={
'secret': RECAPTCHA_SECRET_KEY,
'response': token,
'remoteip': remote_ip,
}
)

result = response.json()

# reCAPTCHA v3 返回 0-1 的分数
# 0.5 是推荐的阈值,可根据业务调整
return result.get('success', False) and result.get('score', 0) >= 0.5


@app.route('/login', methods=['POST'])
def login():
email = request.form['email']
user_id = get_user_id_by_email(email)

lockout = AccountLockout(redis_client)

# 检查是否需要 CAPTCHA
attempts_key = f"lockout:attempts:{user_id}"
attempts = int(redis_client.get(attempts_key) or 0)

require_captcha = attempts >= 2

if require_captcha:
captcha_token = request.form.get('recaptcha_token')
if not captcha_token or not verify_recaptcha(captcha_token, request.remote_addr):
return jsonify({
'error': '请完成人机验证',
'require_captcha': True,
}), 400

# 继续登录流程...

撞库攻击(Credential Stuffing)

攻击者使用从其他网站泄露的用户名密码组合进行批量登录尝试。

攻击特征

  • 大量来自不同 IP 的登录请求
  • 使用已知的泄露凭据
  • 可能使用自动化工具和代理池

防御措施

1. 泄露密码检测

# 注册和修改密码时检查密码是否已泄露
async def check_password_breach(password: str) -> dict:
"""检查密码是否在已知泄露数据库中"""
import hashlib

sha1_hash = hashlib.sha1(password.encode()).hexdigest().upper()
prefix = sha1_hash[:5]
suffix = sha1_hash[5:]

# 查询 Have I Been Pwned API
async with aiohttp.ClientSession() as session:
url = f"https://api.pwnedpasswords.com/range/{prefix}"
async with session.get(url) as response:
text = await response.text()

# 解析响应
for line in text.split('\n'):
hash_suffix, count = line.split(':')
if hash_suffix == suffix:
return {
'breached': True,
'count': int(count),
'message': f'该密码已出现在 {int(count):,} 次数据泄露中',
}

return {'breached': False, 'count': 0}

2. IP 信誉评分

# Python - IP 信誉检查
class IPRiskAssessment:
def __init__(self, redis_client):
self.redis = redis_client

def assess_risk(self, ip: str) -> dict:
"""评估 IP 风险"""
risk_score = 0
risk_factors = []

# 1. 检查是否在黑名单中
if self.redis.sismember('ip:blacklist', ip):
risk_score += 50
risk_factors.append('ip_blacklisted')

# 2. 检查短期请求频率
key = f"ip:requests:{ip}"
requests = int(self.redis.get(key) or 0)
if requests > 100: # 1 分钟内超过 100 次请求
risk_score += 30
risk_factors.append('high_request_rate')

# 3. 检查失败登录次数
fail_key = f"ip:failed_logins:{ip}"
failed_logins = int(self.redis.get(fail_key) or 0)
if failed_logins > 10:
risk_score += 20
risk_factors.append('high_failure_rate')

# 4. 检查是否为已知 VPN/代理 IP(可接入第三方服务)
# is_vpn = check_vpn_service(ip)
# if is_vpn:
# risk_score += 15
# risk_factors.append('vpn_proxy')

return {
'risk_score': risk_score,
'risk_level': 'high' if risk_score >= 50 else 'medium' if risk_score >= 20 else 'low',
'risk_factors': risk_factors,
}

def should_block(self, ip: str) -> bool:
"""决定是否阻止该 IP"""
assessment = self.assess_risk(ip)
return assessment['risk_score'] >= 70

3. 多因素认证

MFA 是防止撞库攻击最有效的手段。即使密码泄露,攻击者仍需要第二因素才能登录。

# 登录时检查是否需要 MFA
@app.route('/login', methods=['POST'])
def login():
user = authenticate_user(request.form['email'], request.form['password'])

if not user:
return jsonify({'error': '邮箱或密码错误'}), 401

# 如果启用了 MFA,要求验证第二因素
if user.mfa_enabled:
# 创建临时会话,等待 MFA 验证
temp_token = generate_temp_token(user.id)
return jsonify({
'require_mfa': True,
'temp_token': temp_token,
'mfa_methods': ['totp', 'backup_code'],
})

return jsonify({'token': generate_auth_token(user.id)})

密码喷洒(Password Spraying)

攻击者对大量账户尝试少数几个常见密码,避免触发单个账户的锁定机制。

攻击特征

  • 同一 IP 对多个账户尝试登录
  • 使用常见密码列表
  • 分散在时间上进行

防御措施

# 全局速率限制 + IP 限制
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"],
)

# 登录端点的严格限制
@app.route('/login', methods=['POST'])
@limiter.limit("10 per minute") # 每个 IP 每分钟最多 10 次
def login():
# 额外的全局密码尝试限制
global_key = "global:login_attempts"
global_attempts = redis_client.incr(global_key)

if global_attempts == 1:
redis_client.expire(global_key, 60) # 1 分钟窗口

if global_attempts > 1000: # 全局每分钟限制
return jsonify({'error': '服务繁忙,请稍后重试'}), 429

# 继续登录验证...

会话攻击

会话劫持

攻击者窃取用户的会话标识符,冒充用户身份。

常见劫持方式

方式说明危险程度
XSS通过跨站脚本窃取 Cookie
网络嗅探在不安全网络中截获 Cookie
物理访问直接访问用户设备
日志泄露会话 ID 出现在日志中

防御措施

# Flask - 安全的 Cookie 配置
from flask import session

app.config.update(
SESSION_COOKIE_SECURE=True, # 仅 HTTPS
SESSION_COOKIE_HTTPONLY=True, # 禁止 JavaScript 访问
SESSION_COOKIE_SAMESITE='Lax', # 防止 CSRF
PERMANENT_SESSION_LIFETIME=timedelta(minutes=30),
)

# 或者手动设置
@app.route('/login', methods=['POST'])
def login():
# ... 认证逻辑 ...

response = jsonify({'success': True})
response.set_cookie(
'session_id',
session_id,
max_age=1800,
httponly=True,
secure=True,
samesite='Lax',
)
return response

会话绑定

将会话与客户端特征绑定,检测异常:

import hashlib

class SessionBinding:
@staticmethod
def generate_fingerprint(request) -> str:
"""生成客户端指纹"""
components = [
request.headers.get('User-Agent', ''),
request.headers.get('Accept-Language', ''),
request.headers.get('Accept-Encoding', ''),
]
combined = '|'.join(components)
return hashlib.sha256(combined.encode()).hexdigest()[:16]

@staticmethod
def bind_session(session_id: str, request):
"""绑定会话到客户端"""
fingerprint = SessionBinding.generate_fingerprint(request)
redis_client.hset(
f"session:{session_id}",
mapping={
'fingerprint': fingerprint,
'ip': request.remote_addr,
'created_at': datetime.utcnow().isoformat(),
}
)

@staticmethod
def verify_binding(session_id: str, request) -> bool:
"""验证会话绑定"""
session_data = redis_client.hgetall(f"session:{session_id}")

if not session_data:
return False

current_fingerprint = SessionBinding.generate_fingerprint(request)
stored_fingerprint = session_data.get('fingerprint')

# 指纹不匹配可能意味着会话被劫持
if current_fingerprint != stored_fingerprint:
# 记录安全事件
log_security_event(
event='session_binding_mismatch',
session_id=session_id,
stored_fingerprint=stored_fingerprint,
current_fingerprint=current_fingerprint,
ip=request.remote_addr,
)
return False

return True


# 在每个请求中验证
@app.before_request
def check_session_binding():
session_id = request.cookies.get('session_id')
if session_id and not SessionBinding.verify_binding(session_id, request):
# 销毁会话并要求重新登录
invalidate_session(session_id)
return jsonify({'error': '会话验证失败,请重新登录'}), 401

会话固定攻击

攻击者诱导用户使用攻击者预设的会话 ID,从而在用户登录后获取会话访问权。

攻击场景

  1. 攻击者获取一个有效的会话 ID(如 SESSION=abc123
  2. 攻击者诱导受害者点击链接 https://example.com?SESSION=abc123
  3. 受害者使用该会话 ID 登录
  4. 攻击者使用相同的会话 ID 访问受害者账户

防御措施

# 关键:认证成功后重新生成会话 ID
@app.route('/login', methods=['POST'])
def login():
user = authenticate_user(request.form['email'], request.form['password'])

if user:
# 重要:登录成功后重新生成会话
session.clear()
session.regenerate() # Flask 内置方法

# 或手动实现
old_session_id = request.cookies.get('session_id')
if old_session_id:
redis_client.delete(f"session:{old_session_id}")

new_session_id = secrets.token_urlsafe(32)
session['user_id'] = user.id
session['created_at'] = datetime.utcnow().isoformat()

# 绑定会话
SessionBinding.bind_session(new_session_id, request)

response = jsonify({'success': True})
response.set_cookie('session_id', new_session_id, httponly=True, secure=True)
return response

return jsonify({'error': '邮箱或密码错误'}), 401

CSRF(跨站请求伪造)

攻击者诱导用户在已认证状态下执行非预期操作。

攻击示例

<!-- 攻击者的网站 evil.com -->
<img src="https://bank.example.com/transfer?to=attacker&amount=10000" />

如果用户已登录银行网站,该请求会携带用户的 Cookie,导致转账执行。

防御措施

# 设置 SameSite 属性
response.set_cookie(
'session_id',
session_id,
httponly=True,
secure=True,
samesite='Strict', # 或 'Lax'
)
SameSite 值行为
Strict完全禁止跨站请求携带 Cookie
Lax允许安全的跨站 GET 请求(推荐)
None允许所有跨站请求(需配合 Secure)

2. CSRF Token

import secrets
from flask import request

# 生成 CSRF Token
def generate_csrf_token() -> str:
return secrets.token_urlsafe(32)

# 设置和验证 CSRF Token
@app.before_request
def csrf_protection():
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
token = request.form.get('csrf_token') or request.headers.get('X-CSRF-Token')
session_token = session.get('csrf_token')

if not token or not session_token or not secrets.compare_digest(token, session_token):
return jsonify({'error': 'CSRF 验证失败'}), 403


# 在表单中包含 CSRF Token
@app.route('/form')
def form():
csrf_token = generate_csrf_token()
session['csrf_token'] = csrf_token
return render_template('form.html', csrf_token=csrf_token)
<!-- 表单中包含 CSRF Token -->
<form method="POST" action="/transfer">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<input type="text" name="amount">
<button type="submit">转账</button>
</form>

<!-- 或通过 AJAX 提交 -->
<script>
fetch('/transfer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': document.querySelector('meta[name="csrf-token"]').content,
},
body: JSON.stringify({ amount: 1000 }),
});
</script>

适用于无状态 API:

// 前端:从 Cookie 读取 CSRF Token 并放入请求头
function getCookie(name) {
const value = `; ${document.cookie}`;
const parts = value.split(`; ${name}=`);
if (parts.length === 2) return parts.pop().split(';').shift();
}

fetch('/api/transfer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': getCookie('csrf_token'),
},
body: JSON.stringify({ amount: 1000 }),
});
# 后端验证
@app.before_request
def verify_double_submit():
if request.method in ['POST', 'PUT', 'DELETE', 'PATCH']:
cookie_token = request.cookies.get('csrf_token')
header_token = request.headers.get('X-CSRF-Token')

if not cookie_token or not header_token:
return jsonify({'error': '缺少 CSRF Token'}), 403

if not secrets.compare_digest(cookie_token, header_token):
return jsonify({'error': 'CSRF 验证失败'}), 403

协议攻击

重放攻击

攻击者截获并重复发送有效的请求或令牌。

防御措施

1. Nonce(一次性值)

import time

class NonceValidator:
def __init__(self, redis_client, ttl=300):
self.redis = redis_client
self.ttl = ttl # 有效期 5 分钟

def generate_nonce(self) -> str:
"""生成 Nonce"""
return secrets.token_urlsafe(32)

def validate_nonce(self, nonce: str) -> bool:
"""验证 Nonce(使用后立即失效)"""
key = f"nonce:{nonce}"

# 使用 SETNX 实现原子性检查
if self.redis.setnx(key, '1'):
self.redis.expire(key, self.ttl)
return True

return False # Nonce 已被使用


# 使用示例
@app.route('/api/sensitive-action', methods=['POST'])
def sensitive_action():
nonce = request.headers.get('X-Request-Nonce')

if not nonce or not NonceValidator(redis_client).validate_nonce(nonce):
return jsonify({'error': '无效或重复的请求'}), 400

# 处理请求...

2. 时间戳验证

class TimestampValidator:
MAX_SKEW = 300 # 允许 5 分钟时间偏差

@staticmethod
def validate(timestamp: int) -> bool:
current_time = int(time.time())
return abs(current_time - timestamp) <= TimestampValidator.MAX_SKEW


# 结合时间戳和签名
@app.route('/api/action', methods=['POST'])
def protected_action():
timestamp = int(request.headers.get('X-Timestamp', 0))
signature = request.headers.get('X-Signature')

# 验证时间戳
if not TimestampValidator.validate(timestamp):
return jsonify({'error': '请求已过期'}), 400

# 验证签名
payload = f"{request.method}:{request.path}:{timestamp}:{request.get_data(as_text=True)}"
expected_signature = hmac.new(
SECRET_KEY.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()

if not secrets.compare_digest(signature, expected_signature):
return jsonify({'error': '签名验证失败'}), 401

# 处理请求...

JWT 相关攻击

1. 算法混淆攻击

攻击者将 JWT 的算法改为 none,绕过签名验证。

# 错误示例:信任 JWT 中的算法声明
decoded = jwt.decode(token, key) # 危险!


# 正确示例:明确指定允许的算法
decoded = jwt.decode(token, key, algorithms=['HS256', 'RS256'])

2. 密钥混淆攻击

攻击者将 RS256(非对称)改为 HS256(对称),并使用公钥作为 HMAC 密钥。

# 防御:分离公钥和私钥的使用场景
from jwt import PyJWKClient

# 使用 JWKS 端点获取公钥
jwks_client = PyJWKClient('https://auth.example.com/.well-known/jwks.json')

def verify_jwt_rs256(token: str):
signing_key = jwks_client.get_signing_key_from_jwt(token)

# 明确只接受 RS256
return jwt.decode(
token,
signing_key.key,
algorithms=['RS256'], # 只接受 RS256
audience='my-api',
issuer='https://auth.example.com',
)

3. 敏感信息泄露

JWT 载荷只经过 Base64 编码,任何人都能读取。

# 错误:在 JWT 中存储敏感信息
token = jwt.encode({
'user_id': 123,
'password': 'secret123', # 危险!
'credit_card': '4111...', # 危险!
}, key)


# 正确:只存储必要的标识符
token = jwt.encode({
'sub': 'user_123',
'role': 'user',
'exp': datetime.utcnow() + timedelta(minutes=15),
}, key)

社会工程攻击

钓鱼攻击防御

虽然技术无法完全阻止钓鱼攻击,但可以通过以下方式降低风险:

1. 品牌一致性

# 在邮件中包含明确的品牌标识和安全提示
def send_password_reset_email(email: str, token: str):
reset_url = f"https://example.com/reset-password?token={token}"

html = f"""
<div style="max-width: 600px; margin: 0 auto;">
<div style="background: #1a73e8; color: white; padding: 20px;">
<h1>Example App</h1>
</div>

<div style="padding: 20px;">
<p>您好,</p>
<p>我们收到了重置您 Example App 账户密码的请求。</p>

<p style="margin: 30px 0;">
<a href="{reset_url}"
style="background: #1a73e8; color: white; padding: 12px 24px;
text-decoration: none; border-radius: 4px;">
重置密码
</a>
</p>

<p>或复制以下链接到浏览器:</p>
<p style="word-break: break-all; background: #f5f5f5; padding: 10px;">
{reset_url}
</p>

<hr style="margin: 30px 0;">

<p style="color: #666; font-size: 14px;">
<strong>安全提示:</strong><br>
• Example App 永远不会在邮件中询问您的密码<br>
• 如果您没有请求重置密码,请忽略此邮件<br>
• 重置链接将在 15 分钟后失效
</p>

<p style="color: #999; font-size: 12px;">
此邮件由系统自动发送,请勿回复。<br>
© 2024 Example App. 保留所有权利。
</p>
</div>
</div>
"""

send_email(to=email, subject='重置您的 Example App 密码', html=html)

2. 域名保护

# 检测可疑域名注册(需要定期运行)
import whois

def check_similar_domains(brand_name: str):
"""检查相似的域名是否被注册"""
tlds = ['.com', '.net', '.org', '.io']
similar_patterns = [
f'{brand_name}-login',
f'{brand_name}-secure',
f'{brand_name}-auth',
f'get{brand_name}',
f'my{brand_name}',
]

suspicious = []
for pattern in similar_patterns:
for tld in tlds:
domain = f"{pattern}{tld}"
try:
w = whois.whois(domain)
if w.domain_name:
suspicious.append({
'domain': domain,
'registrar': w.registrar,
'created': w.creation_date,
})
except:
pass

return suspicious

3. 用户教育

在应用内展示安全提示:

// 登录成功后显示安全提示
function showSecurityReminder() {
if (!localStorage.getItem('security_reminder_shown')) {
showNotification({
title: '安全提醒',
message: `
我们永远不会:
• 通过邮件或电话询问您的密码
• 要求您下载软件
• 要求您提供验证码

如果您收到此类请求,请立即举报。
`,
type: 'info',
});
localStorage.setItem('security_reminder_shown', Date.now());
}
}

安全监控与响应

异常检测

from collections import defaultdict
from datetime import datetime, timedelta

class SecurityMonitor:
def __init__(self, redis_client):
self.redis = redis_client

def detect_anomaly(self, user_id: str, action: str, context: dict) -> dict:
"""检测异常行为"""
anomalies = []

# 1. 检测新设备
device_id = self._get_device_id(context)
known_devices = self.redis.smembers(f"user:{user_id}:devices")
if device_id not in known_devices:
anomalies.append({
'type': 'new_device',
'severity': 'medium',
'details': {'device_id': device_id},
})

# 2. 检测新位置
location = context.get('location')
recent_locations = self.redis.lrange(f"user:{user_id}:locations", 0, 9)
if location and location.encode() not in recent_locations:
anomalies.append({
'type': 'new_location',
'severity': 'medium',
'details': {'location': location},
})

# 3. 检测异常时间
hour = datetime.utcnow().hour
typical_hours = self._get_typical_activity_hours(user_id)
if hour not in typical_hours:
anomalies.append({
'type': 'unusual_time',
'severity': 'low',
'details': {'hour': hour},
})

return {
'has_anomaly': len(anomalies) > 0,
'anomalies': anomalies,
'risk_level': max((a['severity'] for a in anomalies), default='none'),
}

def _get_device_id(self, context: dict) -> str:
"""生成设备指纹"""
import hashlib
components = [
context.get('user_agent', ''),
context.get('screen_resolution', ''),
context.get('timezone', ''),
]
return hashlib.sha256('|'.join(components).encode()).hexdigest()[:16]

def _get_typical_activity_hours(self, user_id: str) -> set:
"""获取用户典型活动时间"""
key = f"user:{user_id}:activity_hours"
hours = self.redis.smembers(key)
return set(int(h) for h in hours) if hours else set(range(24))

安全事件日志

import json
from datetime import datetime

class SecurityEventLogger:
EVENTS = {
'LOGIN_SUCCESS': 'info',
'LOGIN_FAILED': 'warning',
'ACCOUNT_LOCKED': 'warning',
'PASSWORD_CHANGED': 'info',
'MFA_ENABLED': 'info',
'MFA_DISABLED': 'warning',
'SUSPICIOUS_ACTIVITY': 'critical',
'SESSION_HIJACK_ATTEMPT': 'critical',
}

@staticmethod
def log(event_type: str, user_id: str = None, details: dict = None, request=None):
"""记录安全事件"""
severity = SecurityEventLogger.EVENTS.get(event_type, 'info')

event = {
'event_type': event_type,
'severity': severity,
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'details': details or {},
}

if request:
event['ip'] = request.remote_addr
event['user_agent'] = request.headers.get('User-Agent')
event['path'] = request.path

# 写入日志
logger.log(json.dumps(event), level=severity)

# 高危事件触发告警
if severity in ['critical', 'warning']:
SecurityEventLogger._alert(event)

@staticmethod
def _alert(event: dict):
"""发送安全告警"""
# 发送到监控系统(如 Prometheus, ELK)
# 或发送邮件/短信通知管理员
pass


# 使用示例
SecurityEventLogger.log(
event_type='LOGIN_FAILED',
user_id='user_123',
details={'reason': 'invalid_password', 'attempts': 3},
request=request,
)

防御措施总结

攻击类型核心防御措施
暴力破解账户锁定、速率限制、CAPTCHA、MFA
撞库攻击泄露检测、IP 信誉、MFA
密码喷洒全局速率限制、IP 限制
会话劫持安全 Cookie、会话绑定、HTTPS
会话固定登录后重新生成会话 ID
CSRFSameSite Cookie、CSRF Token
重放攻击Nonce、时间戳验证
JWT 攻击算法白名单、密钥分离
钓鱼攻击用户教育、品牌一致性、域名监控

小结

本章学习了认证系统中常见的安全攻击及其防御策略:

  1. 凭据攻击:暴力破解、撞库、密码喷洒及对应防御
  2. 会话攻击:劫持、固定、CSRF 的原理和防御
  3. 协议攻击:重放攻击、JWT 攻击的防御实现
  4. 社会工程:钓鱼攻击的预防措施
  5. 安全监控:异常检测和事件日志

安全是一个持续的过程,需要在设计和实现阶段就考虑防御措施,并在运行时持续监控和响应。

练习

  1. 实现一个完整的账户锁定机制,支持指数退避
  2. 实现 CSRF Token 的生成和验证
  3. 设计一个安全事件监控系统

参考资料