多因素认证 (MFA):构建纵深防御体系
多因素认证(Multi-Factor Authentication,MFA)要求用户提供两个或更多独立的认证因素,显著提升账户安全性。即使密码泄露,攻击者仍需突破第二道防线才能入侵账户。
核心概念:认证因素分类
MFA 的核心思想是组合不同类型的认证因素:
三大因素类型
| 因素类型 | 说明 | 示例 |
|---|---|---|
| 所知 (Something you know) | 用户知道的信息 | 密码、PIN 码、安全问题答案 |
| 所有 (Something you have) | 用户拥有的设备 | 手机、硬件令牌、智能卡 |
| 所是 (Something you are) | 用户的生物特征 | 指纹、面容、虹膜 |
因素组合原则
有效的 MFA 必须使用不同类型的因素:
✓ 有效组合:密码 + 手机验证码
✗ 无效组合:密码 + 安全问题(都是"所知")
✓ 有效组合:密码 + TOTP
✓ 有效组合:密码 + 指纹
✓ 有效组合:密码 + 硬件密钥
TOTP:基于时间的一次性密码
TOTP(Time-based One-Time Password)是最广泛使用的 MFA 方式,每 30 秒生成一个 6 位数字验证码。
工作原理
TOTP 基于 RFC 6238 标准,使用共享密钥和当前时间计算验证码:
其中:
- :共享密钥(Base32 编码)
- :当前 Unix 时间戳
- :起始时间(通常为 0)
- :时间步长(通常为 30 秒)
共享密钥 (Secret)
│
↓
┌─────────────┐
│ 时间戳 │ ──→ 当前时间戳 / 30
└─────────────┘
│
↓
┌─────────────┐
│ HMAC-SHA1 │ ──→ 使用密钥对计数器进行 HMAC
└─────────────┘
│
↓
┌─────────────┐
│ 截取 │ ──→ 从哈希结果中提取 6 位数字
└─────────────┘
│
↓
TOTP 验证码 (如 123456)
服务端实现
密钥生成
// Node.js - 生成 TOTP 密钥
const crypto = require('crypto');
const base32 = require('base32-encode');
function generateSecret() {
// 生成 20 字节随机密钥
const buffer = crypto.randomBytes(20);
// Base32 编码(兼容 Google Authenticator)
return base32(buffer, 'RFC4648');
}
// 生成 otpauth:// URL(用于二维码)
function generateOtpAuthUrl(secret, email, issuer) {
const encodedIssuer = encodeURIComponent(issuer);
const encodedEmail = encodeURIComponent(email);
return `otpauth://totp/${encodedIssuer}:${encodedEmail}?secret=${secret}&issuer=${encodedIssuer}&algorithm=SHA1&digits=6&period=30`;
}
TOTP 验证
// Node.js - TOTP 验证
const crypto = require('crypto');
const base32 = require('base32-decode');
function verifyTOTP(secret, token, window = 1) {
// Base32 解码密钥
const key = base32(secret, 'RFC4648');
// 当前时间步
const counter = Math.floor(Date.now() / 1000 / 30);
// 检查时间窗口(允许时钟偏差)
for (let i = -window; i <= window; i++) {
const expectedToken = generateTOTP(key, counter + i);
if (timingSafeEqual(token, expectedToken)) {
return true;
}
}
return false;
}
function generateTOTP(key, counter) {
// 将计数器转换为大端字节数组
const buffer = Buffer.alloc(8);
buffer.writeBigUInt64BE(BigInt(counter));
// HMAC-SHA1
const hmac = crypto.createHmac('sha1', key);
hmac.update(buffer);
const hash = hmac.digest();
// 动态截取
const offset = hash[hash.length - 1] & 0x0f;
const code = ((hash[offset] & 0x7f) << 24 |
(hash[offset + 1] & 0xff) << 16 |
(hash[offset + 2] & 0xff) << 8 |
(hash[offset + 3] & 0xff)) % 1000000;
return code.toString().padStart(6, '0');
}
// 恒定时间比较(防止计时攻击)
function timingSafeEqual(a, b) {
if (a.length !== b.length) return false;
return crypto.timingSafeEqual(Buffer.from(a), Buffer.from(b));
}
完整服务端 API
// Express.js - TOTP 设置和验证 API
const express = require('express');
const QRCode = require('qrcode');
const router = express.Router();
// 生成 TOTP 设置二维码
router.get('/setup', authenticateUser, async (req, res) => {
// 检查是否已启用 TOTP
if (req.user.totpEnabled) {
return res.status(400).json({ error: 'TOTP 已启用' });
}
// 生成密钥
const secret = generateSecret();
// 生成 otpauth URL
const otpauthUrl = generateOtpAuthUrl(
secret,
req.user.email,
'MyApp'
);
// 生成二维码
const qrCode = await QRCode.toDataURL(otpauthUrl);
// 临时保存密钥(验证成功后才正式启用)
await redis.setex(`totp:temp:${req.user.id}`, 300, secret);
res.json({
secret,
qrCode,
manualEntryKey: secret,
});
});
// 验证并启用 TOTP
router.post('/enable', authenticateUser, async (req, res) => {
const { token } = req.body;
// 获取临时密钥
const secret = await redis.get(`totp:temp:${req.user.id}`);
if (!secret) {
return res.status(400).json({ error: '设置已过期,请重新开始' });
}
// 验证 TOTP
if (!verifyTOTP(secret, token)) {
return res.status(400).json({ error: '验证码错误' });
}
// 生成备份码
const backupCodes = generateBackupCodes();
// 保存到数据库
await User.update(req.user.id, {
totpSecret: secret,
totpEnabled: true,
backupCodes: hashBackupCodes(backupCodes),
});
// 清除临时密钥
await redis.del(`totp:temp:${req.user.id}`);
res.json({
success: true,
backupCodes, // 只返回一次!
message: 'TOTP 已启用,请妥善保存备份码',
});
});
// TOTP 验证(登录时)
router.post('/verify', authenticateUser, async (req, res) => {
const { token } = req.body;
if (!req.user.totpEnabled) {
return res.status(400).json({ error: 'TOTP 未启用' });
}
// 检查是否为备份码
if (token.length === 8 && /^[A-Z0-9]{8}$/.test(token)) {
const isValidBackup = await verifyBackupCode(req.user, token);
if (isValidBackup) {
return res.json({ success: true });
}
return res.status(400).json({ error: '备份码无效或已使用' });
}
// 验证 TOTP
if (!verifyTOTP(req.user.totpSecret, token)) {
return res.status(400).json({ error: '验证码错误' });
}
res.json({ success: true });
});
// 生成备份码
function generateBackupCodes() {
const codes = [];
for (let i = 0; i < 10; i++) {
codes.push(crypto.randomBytes(4).toString('hex').toUpperCase());
}
return codes;
}
Python 实现
# Python - TOTP 实现
import hmac
import hashlib
import struct
import time
import base64
import secrets
def generate_secret() -> str:
"""生成 Base32 编码的密钥"""
return base64.b32encode(secrets.token_bytes(20)).decode('utf-8')
def generate_totp(secret: str, digits: int = 6, period: int = 30) -> str:
"""生成 TOTP 验证码"""
# Base32 解码
key = base64.b32decode(secret)
# 计算时间计数器
counter = int(time.time() // period)
# 将计数器转换为大端字节
counter_bytes = struct.pack('>Q', counter)
# HMAC-SHA1
hmac_result = hmac.new(key, counter_bytes, hashlib.sha1).digest()
# 动态截取
offset = hmac_result[-1] & 0x0f
code = struct.unpack('>I', hmac_result[offset:offset + 4])[0] & 0x7fffffff
code = code % (10 ** digits)
return str(code).zfill(digits)
def verify_totp(secret: str, token: str, window: int = 1) -> bool:
"""验证 TOTP"""
try:
token = str(token).zfill(6)
for i in range(-window, window + 1):
counter = int(time.time() // 30) + i
key = base64.b32decode(secret)
counter_bytes = struct.pack('>Q', counter)
hmac_result = hmac.new(key, counter_bytes, hashlib.sha1).digest()
offset = hmac_result[-1] & 0x0f
code = struct.unpack('>I', hmac_result[offset:offset + 4])[0] & 0x7fffffff
code = str(code % 1000000).zfill(6)
if hmac.compare_digest(token, code):
return True
return False
except Exception:
return False
# 生成 otpauth URL
def generate_otpauth_url(secret: str, email: str, issuer: str) -> str:
from urllib.parse import quote
return f"otpauth://totp/{quote(issuer)}:{quote(email)}?secret={secret}&issuer={quote(issuer)}"
Java 实现
// Java - TOTP 实现
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.math.BigInteger;
import java.security.GeneralSecurityException;
import java.util.Base64;
public class TOTP {
private static final int DIGITS = 6;
private static final int PERIOD = 30;
private static final String ALGORITHM = "HmacSHA1";
/**
* 生成随机密钥
*/
public static String generateSecret() {
byte[] bytes = new byte[20];
new java.security.SecureRandom().nextBytes(bytes);
return Base64.getEncoder().withoutPadding().encodeToString(bytes)
.replace('+', '-')
.replace('/', '_');
}
/**
* 生成 TOTP 验证码
*/
public static String generateTOTP(String secret) throws GeneralSecurityException {
byte[] key = Base64.getDecoder().decode(secret);
long counter = System.currentTimeMillis() / 1000 / PERIOD;
byte[] counterBytes = BigInteger.valueOf(counter).toByteArray();
byte[] paddedCounter = new byte[8];
int offset = 8 - counterBytes.length;
System.arraycopy(counterBytes, 0, paddedCounter, offset, counterBytes.length);
Mac mac = Mac.getInstance(ALGORITHM);
mac.init(new SecretKeySpec(key, ALGORITHM));
byte[] hash = mac.doFinal(paddedCounter);
int otp = ((hash[hash.length - 1] & 0xff) & 0x0f);
int binary = ((hash[otp] & 0x7f) << 24 |
(hash[otp + 1] & 0xff) << 16 |
(hash[otp + 2] & 0xff) << 8 |
(hash[otp + 3] & 0xff));
int code = binary % (int) Math.pow(10, DIGITS);
return String.format("%0" + DIGITS + "d", code);
}
/**
* 验证 TOTP
*/
public static boolean verify(String secret, String token, int window) {
try {
long currentCounter = System.currentTimeMillis() / 1000 / PERIOD;
for (int i = -window; i <= window; i++) {
String expected = generateTOTPAtCounter(secret, currentCounter + i);
if (timingSafeEqual(token, expected)) {
return true;
}
}
return false;
} catch (Exception e) {
return false;
}
}
private static boolean timingSafeEqual(String a, String b) {
if (a.length() != b.length()) return false;
int result = 0;
for (int i = 0; i < a.length(); i++) {
result |= a.charAt(i) ^ b.charAt(i);
}
return result == 0;
}
}
短信验证码 (SMS OTP)
短信验证码是最常见的 MFA 方式,用户体验好但安全性相对较低。
安全风险
| 风险 | 说明 |
|---|---|
| SIM 卡劫持 | 攻击者冒充受害者将手机号转移到自己控制的 SIM 卡 |
| SS7 漏洞 | 移动网络的协议漏洞可被利用拦截短信 |
| 恶意软件 | 手机上的恶意软件可读取短信 |
| 社会工程 | 攻击者诱骗用户透露验证码 |
最佳实践
// 短信验证码发送
const crypto = require('crypto');
// 生成 6 位验证码
function generateSMSCode() {
return crypto.randomInt(100000, 999999).toString();
}
// 发送验证码
async function sendSMSCode(phoneNumber) {
// 速率限制
const key = `sms:limit:${phoneNumber}`;
const count = await redis.incr(key);
if (count === 1) {
await redis.expire(key, 3600); // 1 小时窗口
}
if (count > 5) {
throw new Error('发送频率超限,请稍后再试');
}
// 生成验证码
const code = generateSMSCode();
// 存储验证码(5 分钟有效)
await redis.setex(`sms:code:${phoneNumber}`, 300, code);
// 发送短信
await smsClient.send(phoneNumber, `您的验证码是 ${code},5 分钟内有效。`);
return { success: true };
}
// 验证短信验证码
async function verifySMSCode(phoneNumber, code) {
const key = `sms:code:${phoneNumber}`;
const storedCode = await redis.get(key);
if (!storedCode) {
return { success: false, error: '验证码已过期' };
}
// 检查尝试次数
const attemptsKey = `sms:attempts:${phoneNumber}`;
const attempts = parseInt(await redis.get(attemptsKey) || '0');
if (attempts >= 3) {
return { success: false, error: '尝试次数过多,请重新获取验证码' };
}
if (!crypto.timingSafeEqual(Buffer.from(code), Buffer.from(storedCode))) {
await redis.incr(attemptsKey);
await redis.expire(attemptsKey, 300);
return { success: false, error: '验证码错误' };
}
// 验证成功,删除验证码
await redis.del(key);
await redis.del(attemptsKey);
return { success: true };
}
邮箱验证码
邮箱验证码安全性介于 TOTP 和短信之间,适合作为备选方案。
// 发送邮箱验证码
async function sendEmailCode(email) {
// 速率限制
const key = `email:limit:${email}`;
const count = await redis.incr(key);
if (count === 1) {
await redis.expire(key, 3600);
}
if (count > 10) {
throw new Error('发送频率超限');
}
// 生成验证码
const code = crypto.randomInt(100000, 999999).toString();
// 存储
await redis.setex(`email:code:${email}`, 600, code); // 10 分钟有效
// 发送邮件
await sendEmail({
to: email,
subject: '您的验证码',
html: `
<p>您的验证码是:<strong>${code}</strong></p>
<p>验证码 10 分钟内有效,请勿泄露给他人。</p>
`,
});
}
备份码
备份码是用户无法使用常规 MFA 方式时的恢复手段。
生成备份码
function generateBackupCodes(count = 10) {
const codes = [];
for (let i = 0; i < count; i++) {
// 生成 8 位字母数字码
const code = crypto.randomBytes(4).toString('hex').toUpperCase();
codes.push(code);
}
return codes;
}
// 安全存储备份码
async function storeBackupCodes(userId, codes) {
const hashedCodes = await Promise.all(
codes.map(code => bcrypt.hash(code, 10))
);
await db.query(
'UPDATE users SET backup_codes = $1 WHERE id = $2',
[JSON.stringify(hashedCodes), userId]
);
}
// 验证备份码
async function verifyBackupCode(userId, code) {
const result = await db.query(
'SELECT backup_codes FROM users WHERE id = $1',
[userId]
);
const hashedCodes = JSON.parse(result.rows[0].backup_codes);
for (let i = 0; i < hashedCodes.length; i++) {
if (await bcrypt.compare(code, hashedCodes[i])) {
// 移除已使用的备份码
hashedCodes.splice(i, 1);
await db.query(
'UPDATE users SET backup_codes = $1 WHERE id = $2',
[JSON.stringify(hashedCodes), userId]
);
return true;
}
}
return false;
}
MFA 策略设计
分层安全策略
不同敏感度的操作应使用不同级别的 MFA:
const MFA_POLICY = {
// 登录:可选 MFA
LOGIN: {
required: false,
methods: ['totp', 'sms', 'email'],
rememberDays: 30, // 记住设备 30 天
},
// 敏感操作:必须 MFA
SENSITIVE: {
required: true,
methods: ['totp', 'hardware_key'],
maxAge: 5 * 60 * 1000, // 5 分钟内有效
},
// 高危操作:必须重新认证
HIGH_RISK: {
required: true,
methods: ['totp'], // 只允许 TOTP
requireRecentAuth: true, // 必须刚完成密码认证
},
};
function checkMFARequirement(user, action) {
const policy = MFA_POLICY[action];
// 检查用户是否启用了 MFA
if (policy.required && !user.mfaEnabled) {
throw new Error('此操作需要启用 MFA');
}
// 检查最近的 MFA 认证时间
if (policy.maxAge) {
const lastMFA = user.lastMFAAuth;
if (!lastMFA || Date.now() - lastMFA > policy.maxAge) {
return { requireMFA: true };
}
}
return { requireMFA: false };
}
风险自适应认证
根据风险级别动态调整认证要求:
async function assessRisk(user, context) {
let riskScore = 0;
// 设备检查
if (!isKnownDevice(user, context.deviceFingerprint)) {
riskScore += 30;
}
// IP 检查
if (isNewLocation(user, context.ip)) {
riskScore += 20;
}
// 时间异常
if (isUnusualTime(user, context.time)) {
riskScore += 10;
}
// 行为异常
if (isAnomalousBehavior(user, context.action)) {
riskScore += 40;
}
return {
score: riskScore,
level: riskScore > 50 ? 'HIGH' : riskScore > 20 ? 'MEDIUM' : 'LOW',
};
}
async function adaptiveMFA(user, context) {
const risk = await assessRisk(user, context);
switch (risk.level) {
case 'HIGH':
// 高风险:强制 MFA + 安全提醒
return {
requireMFA: true,
methods: ['totp'],
notifyUser: true,
};
case 'MEDIUM':
// 中风险:建议 MFA
return {
requireMFA: true,
methods: ['totp', 'sms'],
};
case 'LOW':
// 低风险:跳过 MFA
return {
requireMFA: false,
};
}
}
安全最佳实践
1. 速率限制
// 防止暴力破解
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 分钟
max: 5, // 最多 5 次尝试
keyGenerator: (req) => `mfa:${req.user.id}`,
handler: (req, res) => {
res.status(429).json({
error: '尝试次数过多,请 15 分钟后重试',
});
},
});
2. 审计日志
// 记录所有 MFA 事件
function logMFAEvent(userId, event, details) {
logger.info('MFA_EVENT', {
userId,
event, // 'ENABLE', 'DISABLE', 'VERIFY_SUCCESS', 'VERIFY_FAILED'
method: details.method,
ip: details.ip,
userAgent: details.userAgent,
timestamp: new Date(),
});
// 失败次数过多触发告警
if (event === 'VERIFY_FAILED') {
checkFailedAttempts(userId);
}
}
3. 恢复流程
// MFA 锁定恢复流程
async function initiateMFARecovery(userId) {
// 1. 验证用户身份(其他方式)
// 2. 设置临时恢复令牌
const recoveryToken = crypto.randomBytes(32).toString('hex');
await redis.setex(`mfa:recovery:${userId}`, 3600, recoveryToken);
// 3. 发送确认邮件
await sendRecoveryEmail(userId, recoveryToken);
}
async function confirmMFARecovery(userId, token, newPassword) {
// 1. 验证恢复令牌
const storedToken = await redis.get(`mfa:recovery:${userId}`);
if (!storedToken || storedToken !== token) {
throw new Error('恢复链接无效或已过期');
}
// 2. 重置密码和 MFA
await resetUserCredentials(userId, newPassword);
await disableMFA(userId);
// 3. 通知用户
await notifyMFADisabled(userId);
}
总结
MFA 方式对比
| 方式 | 安全性 | 便捷性 | 成本 | 适用场景 |
|---|---|---|---|---|
| TOTP | 高 | 中 | 无 | 推荐、主流选择 |
| 短信 | 中 | 高 | 有 | 用户基数大、兼容性好 |
| 邮箱 | 中 | 高 | 低 | 备选方案 |
| 硬件密钥 | 最高 | 中 | 有 | 高安全要求 |
| Passkeys | 最高 | 高 | 无 | 未来方向 |
实施建议
- 推荐 TOTP:安全性高、成本低、用户接受度好
- 提供多种选择:让用户根据自己的需求选择
- 强制备份码:启用 MFA 时必须生成备份码
- 分层策略:不同敏感度操作使用不同级别的认证
- 风险自适应:根据上下文动态调整认证要求