用户认证
用户认证是 Web 应用安全的核心组成部分。无论你构建的是博客、电商平台还是企业应用,都需要确认"当前用户是谁"以及"他能做什么"。本章将深入探讨 Flask 中的用户认证机制,从传统的会话认证到现代的 JWT 认证,帮助你构建安全可靠的身份验证系统。
理解认证与授权
在深入技术细节之前,我们需要先理解两个经常被混淆的概念。
认证(Authentication)
认证是验证"你是谁"的过程。系统需要确认用户声明的身份是否真实,通常通过以下方式:
- 知识因素:密码、PIN 码、安全问题
- 拥有因素:手机验证码、硬件令牌、邮件链接
- 生物因素:指纹、面容、虹膜
最常见的认证方式是用户名+密码,系统验证密码正确后确认用户身份。
授权(Authorization)
授权是验证"你能做什么"的过程。在认证确认身份后,授权决定用户可以访问哪些资源、执行哪些操作。授权通常基于:
- 角色(Role):管理员、普通用户、访客
- 权限(Permission):创建文章、删除用户、查看报表
- 策略(Policy):只能访问自己创建的资源
两者的关系可以概括为:认证解决"你是谁",授权解决"你能做什么"。认证是授权的前提,但授权不一定依赖认证(如公开资源的访问)。
认证流程概览
一个完整的认证流程通常包含以下步骤:
用户提交凭证 → 验证凭证 → 创建会话/签发令牌 → 后续请求携带凭证 → 验证凭证 → 允许访问
Flask-Login 概述
Flask-Login 是 Flask 生态系统中最受欢迎的用户会话管理扩展,它提供了:
- 会话管理:自动处理用户登录状态的存储和验证
- 路由保护:通过装饰器轻松保护需要登录的路由
- 记住我功能:支持持久化登录状态
- 会话保护:防止会话被劫持
- 用户加载:自动从会话中恢复用户对象
为什么选择 Flask-Login
对于传统的 Web 应用(服务端渲染页面),Flask-Login 是最佳选择:
| 特性 | Flask-Login | JWT |
|---|---|---|
| 存储位置 | 服务端会话 | 客户端令牌 |
| 状态管理 | 有状态 | 无状态 |
| 注销机制 | 简单直接 | 需要额外处理 |
| CSRF 防护 | 内置支持 | 需要额外处理 |
| 适用场景 | 传统 Web 应用 | API、单页应用 |
安装与配置
安装依赖
pip install flask-login
对于密码处理,我们使用 Werkzeug 内置的安全函数(Flask 依赖 Werkzeug,无需额外安装):
# Werkzeug 已经作为 Flask 的依赖安装
pip install flask # 这会自动安装 werkzeug
基本配置
from flask import Flask
from flask_login import LoginManager
app = Flask(__name__)
# 必须设置 SECRET_KEY,用于签名会话 Cookie
app.config['SECRET_KEY'] = 'your-secret-key-here'
# 创建 LoginManager 实例
login_manager = LoginManager()
# 初始化扩展
login_manager.init_app(app)
# 配置登录视图(未登录用户重定向到此)
login_manager.login_view = 'auth.login'
# 配置登录提示消息
login_manager.login_message = '请先登录后访问此页面'
# 配置登录消息类别(用于前端样式)
login_manager.login_message_category = 'warning'
# 配置会话保护级别
# 'strong':IP 和 User-Agent 变化时清除会话
# 'basic':仅检查 IP 变化
# None:禁用会话保护
login_manager.session_protection = 'strong'
配置选项详解:
| 选项 | 说明 |
|---|---|
login_view | 未登录用户重定向的路由端点 |
login_message | 重定向时显示的提示消息 |
login_message_category | 消息的 CSS 类别,用于前端样式 |
session_protection | 会话保护级别 |
refresh_view | 需要重新认证时重定向的视图 |
用户模型
Flask-Login 要求用户模型实现特定的属性和方法,最简单的方式是继承 UserMixin 类。
UserMixin 提供的方法
UserMixin 类实现了以下必需方法:
class UserMixin:
def is_authenticated(self):
"""用户是否已认证(总是返回 True)"""
return True
def is_active(self):
"""用户是否激活(可返回 False 禁用账户)"""
return True
def is_anonymous(self):
"""是否是匿名用户(总是返回 False)"""
return False
def get_id(self):
"""返回用户唯一标识符(必须是字符串)"""
return str(self.id)
定义用户模型
from datetime import datetime
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from app import db
class User(UserMixin, db.Model):
"""用户模型"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False) # 哈希后的密码
# 用户状态
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
# 个人信息
nickname = db.Column(db.String(80))
avatar = db.Column(db.String(200))
bio = db.Column(db.Text)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic')
def set_password(self, password):
"""设置密码(存储哈希值,不存储明文)"""
self.password_hash = generate_password_hash(password)
def check_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self):
"""转换为字典(用于 API 响应)"""
return {
'id': self.id,
'username': self.username,
'email': self.email,
'nickname': self.nickname,
'is_admin': self.is_admin,
'created_at': self.created_at.isoformat() if self.created_at else None
}
为什么不在数据库中存储明文密码?
即使数据库被泄露,攻击者也无法直接获取用户密码。密码哈希是单向的,无法逆向还原原始密码。这是安全的基本原则:永远不要存储用户的明文密码。
用户加载回调
Flask-Login 需要知道如何从会话中加载用户对象。通过注册用户加载回调函数实现:
from flask_login import current_user
from app.models import User
@login_manager.user_loader
def load_user(user_id):
"""
根据 user_id 加载用户对象
参数 user_id 是字符串类型(来自会话中的 str(user.id))
返回用户对象或 None(用户不存在时)
"""
return User.query.get(int(user_id))
这个回调函数会在每次请求时被调用,用于从会话中恢复用户对象。如果返回 None,表示用户不存在或会话无效,用户会被视为未登录状态。
密码安全
密码哈希原理
密码哈希不是加密。加密是可逆的(有密钥可以解密),而哈希是单向的。好的密码哈希算法具有以下特点:
- 单向性:无法从哈希值还原原始密码
- 确定性:相同输入产生相同输出
- 雪崩效应:输入微小变化导致输出完全不同
- 慢速:增加暴力破解的成本
- 加盐:防止彩虹表攻击
Werkzeug 密码哈希
Werkzeug 提供了 generate_password_hash 和 check_password_hash 函数:
from werkzeug.security import generate_password_hash, check_password_hash
# 生成密码哈希
password = 'my-password-123'
password_hash = generate_password_hash(password)
# 生成的哈希值类似:
# scrypt:32768:8:1$salt$hash
# 格式:算法:参数:盐:哈希值
# 验证密码
is_valid = check_password_hash(password_hash, password) # True
is_invalid = check_password_hash(password_hash, 'wrong') # False
支持的哈希算法:
| 算法 | 说明 | 推荐场景 |
|---|---|---|
scrypt | 默认算法,内存密集型 | 生产环境(推荐) |
pbkdf2:sha256 | 迭代型哈希 | 兼容性要求时 |
# 使用 scrypt(默认)
hash1 = generate_password_hash('password', method='scrypt')
# 使用 pbkdf2(指定迭代次数)
hash2 = generate_password_hash('password', method='pbkdf2:sha256', salt_length=16)
# 注意:旧版本的 Werkzeug 支持更多算法,但已被弃用
# 不要使用:sha1, md5, sha224, sha512 等简单哈希
密码强度验证
在设置密码时,应该验证密码强度:
import re
def validate_password_strength(password):
"""
验证密码强度
返回: (is_valid, error_message)
"""
if len(password) < 8:
return False, '密码长度至少 8 个字符'
if len(password) > 128:
return False, '密码长度不能超过 128 个字符'
if not re.search(r'[A-Z]', password):
return False, '密码必须包含至少一个大写字母'
if not re.search(r'[a-z]', password):
return False, '密码必须包含至少一个小写字母'
if not re.search(r'\d', password):
return False, '密码必须包含至少一个数字'
# 检查常见弱密码
common_passwords = [
'password', 'Password1', '12345678', 'Qwerty123',
'admin123', 'letmein', 'welcome', 'monkey'
]
if password.lower() in [p.lower() for p in common_passwords]:
return False, '密码过于简单,请使用更复杂的密码'
return True, None
登录与登出
登录流程
from flask import Blueprint, render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, login_required, current_user
from urllib.parse import urlparse
from app.forms import LoginForm
from app.models import User
from app import db
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
# 已登录用户直接跳转
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = LoginForm()
if form.validate_on_submit():
# 查找用户
user = User.query.filter_by(username=form.username.data).first()
# 验证用户和密码
if user is None or not user.check_password(form.password.data):
flash('用户名或密码错误', 'danger')
return redirect(url_for('auth.login'))
# 检查账户是否激活
if not user.is_active:
flash('账户已被禁用,请联系管理员', 'warning')
return redirect(url_for('auth.login'))
# 执行登录
login_user(user, remember=form.remember_me.data)
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
# 安全重定向
next_page = request.args.get('next')
if next_page:
# 验证 next_page 是否为安全 URL(防止开放重定向攻击)
parsed = urlparse(next_page)
if parsed.netloc != '' and parsed.netloc != request.host:
next_page = None
flash(f'欢迎回来,{user.username}!', 'success')
return redirect(next_page or url_for('main.index'))
return render_template('auth/login.html', form=form)
login_user 函数详解
login_user(user, remember=False, duration=None, force=False, fresh=True)
| 参数 | 说明 |
|---|---|
user | 用户对象(必须实现 UserMixin) |
remember | 是否启用"记住我"功能 |
duration | 记住我 Cookie 的有效期(timedelta) |
force | 强制登录,忽略 is_active 检查 |
fresh | 是否标记为"新鲜"登录 |
登出流程
@auth_bp.route('/logout')
@login_required
def logout():
"""用户登出"""
logout_user()
flash('您已成功退出登录', 'info')
return redirect(url_for('main.index'))
logout_user() 会清除用户会话和记住我 Cookie。
注册功能
from app.forms import RegistrationForm
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = RegistrationForm()
if form.validate_on_submit():
# 检查用户名是否已存在
if User.query.filter_by(username=form.username.data).first():
flash('用户名已被使用', 'danger')
return redirect(url_for('auth.register'))
# 检查邮箱是否已存在
if User.query.filter_by(email=form.email.data).first():
flash('邮箱已被注册', 'danger')
return redirect(url_for('auth.register'))
# 创建用户
user = User(
username=form.username.data,
email=form.email.data
)
user.set_password(form.password.data)
db.session.add(user)
db.session.commit()
flash('注册成功!请登录', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/register.html', form=form)
注册表单
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, SubmitField
from wtforms.validators import DataRequired, Email, Length, EqualTo, Regexp
class RegistrationForm(FlaskForm):
username = StringField('用户名', validators=[
DataRequired(message='请输入用户名'),
Length(3, 20, message='用户名长度 3-20 个字符'),
Regexp(r'^[a-zA-Z][a-zA-Z0-9_]*$', message='用户名必须以字母开头,只能包含字母、数字和下划线')
])
email = StringField('邮箱', validators=[
DataRequired(message='请输入邮箱'),
Email(message='请输入有效的邮箱地址')
])
password = PasswordField('密码', validators=[
DataRequired(message='请输入密码'),
Length(8, 128, message='密码长度 8-128 个字符')
])
confirm = PasswordField('确认密码', validators=[
DataRequired(message='请再次输入密码'),
EqualTo('password', message='两次输入的密码不一致')
])
submit = SubmitField('注册')
保护路由
使用 login_required 装饰器
from flask_login import login_required
@app.route('/profile')
@login_required
def profile():
"""用户资料页面(需要登录)"""
return render_template('profile.html', user=current_user)
@app.route('/settings')
@login_required
def settings():
"""设置页面(需要登录)"""
return render_template('settings.html')
当未登录用户访问被保护的路由时:
- 请求被拦截
- 用户被重定向到
login_manager.login_view指定的登录页面 - 显示
login_manager.login_message提示消息 - 原始 URL 被保存到
next参数中,登录后可跳转回去
current_user 对象
current_user 是一个代理对象,代表当前登录的用户:
from flask_login import current_user
@app.route('/welcome')
def welcome():
if current_user.is_authenticated:
return f'欢迎,{current_user.username}!'
else:
return '欢迎,访客!请登录'
# 在模板中使用
# {% if current_user.is_authenticated %}
# <p>欢迎,{{ current_user.username }}!</p>
# {% endif %}
current_user 的属性:
| 属性/方法 | 说明 |
|---|---|
is_authenticated | 是否已认证(True/False) |
is_active | 是否激活(True/False) |
is_anonymous | 是否匿名用户(True/False) |
get_id() | 获取用户 ID |
| 自定义属性 | 用户模型定义的其他属性 |
角色和权限控制
Flask-Login 只处理认证,授权需要自己实现。以下是常用的授权装饰器:
from functools import wraps
from flask import abort
from flask_login import current_user
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if not current_user.is_admin:
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
def role_required(role):
"""角色检查装饰器工厂"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if current_user.role != role:
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def permission_required(permission):
"""权限检查装饰器工厂"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
return redirect(url_for('auth.login'))
if not current_user.has_permission(permission):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
# 使用示例
@app.route('/admin/dashboard')
@login_required
@admin_required
def admin_dashboard():
return render_template('admin/dashboard.html')
@app.route('/editor/posts')
@login_required
@role_required('editor')
def editor_posts():
return render_template('editor/posts.html')
记住我功能
"记住我"功能让用户在关闭浏览器后仍然保持登录状态,通过持久化 Cookie 实现。
工作原理
- 用户勾选"记住我"登录
- Flask-Login 在会话 Cookie 之外,额外设置一个持久化 Cookie
- 当会话过期后,Flask-Login 通过持久化 Cookie 恢复用户登录状态
- 用户无需重新输入密码即可继续访问
配置记住我
from datetime import timedelta
# 配置记住我 Cookie 有效期
app.config['REMEMBER_COOKIE_NAME'] = 'remember_token'
app.config['REMEMBER_COOKIE_DURATION'] = timedelta(days=30)
app.config['REMEMBER_COOKIE_SECURE'] = True # 仅 HTTPS 传输
app.config['REMEMBER_COOKIE_HTTPONLY'] = True # 防止 JS 访问
app.config['REMEMBER_COOKIE_SAMESITE'] = 'Lax' # CSRF 防护
使用记住我
# 登录时启用记住我
login_user(user, remember=True)
# 或根据用户选择
login_user(user, remember=form.remember_me.data)
# 指定自定义有效期
from datetime import timedelta
login_user(user, remember=True, duration=timedelta(days=7))
会话新鲜度
"新鲜"登录指用户刚刚输入过密码,非"新鲜"登录指通过记住我 Cookie 恢复的登录。对于敏感操作,可以要求用户重新认证:
from flask_login import fresh_login_required, login_fresh
@app.route('/change-password')
@fresh_login_required # 必须是新鲜登录
def change_password():
return render_template('change_password.html')
@app.route('/sensitive-action')
def sensitive_action():
if not login_fresh():
# 需要重新认证
return redirect(url_for('auth.reauthenticate'))
# 执行敏感操作
pass
重新认证视图:
@auth_bp.route('/reauthenticate', methods=['GET', 'POST'])
@login_required
def reauthenticate():
"""重新认证(用于敏感操作前)"""
form = ReauthenticateForm()
if form.validate_on_submit():
if current_user.check_password(form.password.data):
# 刷新会话,标记为新鲜
confirm_login()
flash('已重新验证', 'success')
next_page = request.args.get('next')
return redirect(next_page or url_for('main.index'))
flash('密码错误', 'danger')
return render_template('auth/reauthenticate.html', form=form)
会话保护
Flask-Login 提供会话保护机制,防止会话被劫持。
保护级别
# 强保护(推荐)
# IP 或 User-Agent 变化时,会话被标记为非新鲜
# 两者都变化时,会话被清除
login_manager.session_protection = 'strong'
# 基本保护
# 仅检查 IP 变化
login_manager.session_protection = 'basic'
# 禁用保护
login_manager.session_protection = None
会话被篡改时的行为
当检测到会话异常时:
-
strong 模式:
- IP 或 User-Agent 单一变化:会话标记为非新鲜
- 两者都变化:会话被清除,用户需要重新登录
-
basic 模式:
- IP 变化:会话被清除
自定义会话保护行为
@login_manager.session_protected
def on_session_protected():
"""会话保护触发时的回调"""
app.logger.warning(
f'Session protection triggered for user {current_user.id}'
)
JWT 认证
对于 API 或单页应用(SPA),传统的会话认证可能不太合适。JWT(JSON Web Token)是一种无状态的认证方案。
JWT vs 会话认证
| 特性 | 会话认证 | JWT 认证 |
|---|---|---|
| 状态 | 有状态(服务端存储) | 无状态(自包含) |
| 扩展性 | 需要共享会话存储 | 天然支持分布式 |
| 注销 | 立即生效 | 需要额外机制 |
| 安全性 | Cookie 易受 CSRF | Token 易受 XSS |
| 存储 | Cookie | LocalStorage / Cookie |
使用 Flask-JWT-Extended
安装扩展:
pip install flask-jwt-extended
配置:
from flask import Flask
from flask_jwt_extended import JWTManager
from datetime import timedelta
app = Flask(__name__)
# JWT 配置
app.config['JWT_SECRET_KEY'] = 'your-jwt-secret-key'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30)
app.config['JWT_TOKEN_LOCATION'] = ['headers'] # 或 ['cookies']
app.config['JWT_COOKIE_SECURE'] = True
app.config['JWT_COOKIE_CSRF_PROTECT'] = True
jwt = JWTManager(app)
生成和验证 Token:
from flask import jsonify, request
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
get_jwt_identity,
get_jwt
)
@app.route('/api/login', methods=['POST'])
def api_login():
"""API 登录,返回 JWT"""
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
# 创建访问令牌和刷新令牌
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}), 200
return jsonify({'error': '用户名或密码错误'}), 401
@app.route('/api/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""刷新访问令牌"""
user_id = get_jwt_identity()
access_token = create_access_token(identity=user_id)
return jsonify({'access_token': access_token}), 200
@app.route('/api/profile')
@jwt_required()
def api_profile():
"""受保护的 API 端点"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
return jsonify(user.to_dict()), 200
在 Token 中存储额外信息
from flask_jwt_extended import create_access_token
# 创建 Token 时添加额外声明
additional_claims = {
'is_admin': user.is_admin,
'roles': ['user', 'editor']
}
access_token = create_access_token(
identity=user.id,
additional_claims=additional_claims
)
# 在受保护的路由中获取声明
@app.route('/api/admin')
@jwt_required()
def admin_api():
claims = get_jwt()
if not claims.get('is_admin'):
return jsonify({'error': '需要管理员权限'}), 403
return jsonify({'message': 'Admin access granted'}), 200
JWT 注销机制
JWT 的无状态特性使得注销变得复杂,常见方案:
# 方案一:维护黑名单(需要 Redis 或数据库)
from datetime import datetime
class TokenBlacklist(db.Model):
id = db.Column(db.Integer, primary_key=True)
jti = db.Column(db.String(36), unique=True, nullable=True) # JWT ID
token_type = db.Column(db.String(10))
revoked = db.Column(db.Boolean, default=True)
revoked_at = db.Column(db.DateTime, default=datetime.utcnow)
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
jti = jwt_payload['jti']
token = TokenBlacklist.query.filter_by(jti=jti).first()
return token is not None and token.revoked
@app.route('/api/logout', methods=['POST'])
@jwt_required()
def api_logout():
"""注销(将 Token 加入黑名单)"""
jti = get_jwt()['jti']
token_type = get_jwt()['type']
blacklisted = TokenBlacklist(jti=jti, token_type=token_type)
db.session.add(blacklisted)
db.session.commit()
return jsonify({'message': '已注销'}), 200
密码重置
密码重置通常通过邮件发送一次性链接实现。
生成重置令牌
使用 itsdangerous 库生成有时效性的签名令牌:
from itsdangerous import URLSafeTimedSerializer
from flask import current_app
def generate_password_reset_token(email):
"""生成密码重置令牌"""
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
return serializer.dumps(email, salt='password-reset')
def verify_password_reset_token(token, max_age=3600):
"""验证密码重置令牌"""
serializer = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
try:
email = serializer.loads(
token,
salt='password-reset',
max_age=max_age
)
except Exception:
return None
return email
密码重置流程
from flask_mail import Mail, Message
app.config['MAIL_SERVER'] = 'smtp.example.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = '[email protected]'
app.config['MAIL_PASSWORD'] = 'your-password'
mail = Mail(app)
@auth_bp.route('/reset-password', methods=['GET', 'POST'])
def reset_request():
"""请求密码重置"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = ResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
# 生成令牌
token = generate_password_reset_token(user.email)
# 发送邮件
reset_url = url_for(
'auth.reset_token',
token=token,
_external=True
)
msg = Message(
'密码重置',
sender='[email protected]',
recipients=[user.email]
)
msg.body = f'''请点击以下链接重置密码:
{reset_url}
如果您没有请求密码重置,请忽略此邮件。
链接有效期 1 小时。
'''
mail.send(msg)
# 即使用户不存在也显示成功消息(防止用户枚举)
flash('密码重置邮件已发送,请查收', 'info')
return redirect(url_for('auth.login'))
return render_template('auth/reset_request.html', form=form)
@auth_bp.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_token(token):
"""重置密码"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
# 验证令牌
email = verify_password_reset_token(token)
if email is None:
flash('链接无效或已过期', 'warning')
return redirect(url_for('auth.reset_request'))
user = User.query.filter_by(email=email).first()
if user is None:
flash('用户不存在', 'warning')
return redirect(url_for('auth.reset_request'))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash('密码已重置,请登录', 'success')
return redirect(url_for('auth.login'))
return render_template('auth/reset_token.html', form=form)
安全最佳实践
密码安全
- 永远不要存储明文密码
- 使用强哈希算法(scrypt 或 pbkdf2)
- 验证密码强度
- 防止暴力破解:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@auth_bp.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute") # 登录限制更严格
def login():
# ...
会话安全
# 会话配置
app.config['SESSION_COOKIE_SECURE'] = True # 仅 HTTPS
app.config['SESSION_COOKIE_HTTPONLY'] = True # 防止 JS 访问
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # CSRF 防护
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=1)
HTTPS
生产环境必须使用 HTTPS:
# 强制 HTTPS
from flask_talisman import Talisman
Talisman(app, force_https=True)
安全响应头
@app.after_request
def add_security_headers(response):
"""添加安全响应头"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
return response
防止用户枚举
登录和注册时,不要透露用户是否存在:
# 不好的做法
if user is None:
flash('用户不存在', 'error')
elif not user.check_password(password):
flash('密码错误', 'error')
# 好的做法
if user is None or not user.check_password(password):
flash('用户名或密码错误', 'error')
小结
本章深入探讨了 Flask 用户认证的各个方面:
- 认证与授权:理解两者区别,认证验证身份,授权控制访问
- Flask-Login:会话管理、用户加载、路由保护、记住我功能
- 密码安全:哈希存储、强度验证、暴力破解防护
- JWT 认证:无状态认证、令牌管理、API 认证
- 密码重置:安全令牌、邮件发送
- 安全最佳实践:HTTPS、安全响应头、会话保护
认证是 Web 应用安全的基础,正确实现认证机制对保护用户数据至关重要。