数据库集成
数据库是大多数 Web 应用的核心组件,负责持久化存储业务数据。Flask 本身不限制数据库选择,但 Flask-SQLAlchemy 是最流行的 ORM(对象关系映射)解决方案,它在 SQLAlchemy 的基础上提供了与 Flask 的紧密集成。
本章将深入探讨如何使用 Flask-SQLAlchemy 构建数据层,从基础配置到高级查询技巧,帮助你建立坚实的数据库知识体系。
理解 ORM 的价值
在深入技术细节之前,让我们先理解为什么 ORM 如此重要。
ORM 解决的问题
传统 SQL 编程存在以下痛点:
- SQL 注入风险:手动拼接 SQL 容易遗漏转义,导致安全漏洞
- 结果集映射繁琐:将数据库行转换为对象需要大量样板代码
- 数据库方言差异:不同数据库的 SQL 语法略有不同,移植困难
- 关系管理复杂:表之间的关联关系需要手动维护
ORM 通过以下方式解决这些问题:
# 传统 SQL 方式(不推荐)
cursor.execute(
"SELECT * FROM users WHERE username = %s",
(username,)
)
rows = cursor.fetchall()
users = [User(**row) for row in rows]
# ORM 方式(推荐)
users = User.query.filter_by(username=username).all()
SQLAlchemy 的架构
SQLAlchemy 采用分层架构:
- Core 层:提供 SQL 表达式语言、引擎管理、连接池
- ORM 层:在 Core 之上提供对象映射、会话管理、关系加载
Flask-SQLAlchemy 主要简化了 ORM 层与 Flask 的集成,如会话生命周期管理、应用工厂模式支持等。
安装与配置
安装依赖
pip install flask-sqlalchemy flask-migrate
flask-sqlalchemy:Flask 与 SQLAlchemy 的集成flask-migrate:数据库迁移工具,基于 Alembic
基本配置
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
# 数据库 URI 格式
# SQLite: sqlite:///path/to/database.db
# MySQL: mysql+pymysql://user:password@host:port/dbname
# PostgreSQL: postgresql://user:password@host:port/dbname
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
# 禁用修改追踪(节省内存)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 可选:显示 SQL 语句(开发环境)
app.config['SQLALCHEMY_ECHO'] = True
# 可选:连接池配置
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # 连接池大小
'pool_recycle': 3600, # 连接回收时间(秒)
'pool_pre_ping': True, # 连接健康检查
'max_overflow': 5 # 超出 pool_size 后允许的额外连接
}
db = SQLAlchemy(app)
migrate = Migrate(app, db)
应用工厂模式
对于大型应用,推荐使用应用工厂模式:
# extensions.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
# __init__.py
from flask import Flask
from .extensions import db, migrate
def create_app(config_class='config.Config'):
app = Flask(__name__)
app.config.from_object(config_class)
# 初始化扩展
db.init_app(app)
migrate.init_app(app, db)
# 注册蓝图
from .models import User, Post
from .views import main
app.register_blueprint(main)
return app
模型定义
基本模型
from datetime import datetime
from app import db
class User(db.Model):
"""用户模型"""
# 表名(可选,默认为类名小写)
__tablename__ = 'users'
# 主键
id = db.Column(db.Integer, primary_key=True)
# 基本字段
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128))
# 布尔字段
is_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
# 时间字段
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 文本字段
bio = db.Column(db.Text)
# 关系
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self):
"""转换为字典(用于 API)"""
return {
'id': self.id,
'username': self.username,
'email': self.email,
'created_at': self.created_at.isoformat()
}
字段类型详解
| 类型 | Python 类型 | 数据库类型 | 说明 |
|---|---|---|---|
Integer | int | INTEGER | 整数 |
BigInteger | int | BIGINT | 大整数 |
SmallInteger | int | SMALLINT | 小整数 |
Float | float | FLOAT | 浮点数 |
Numeric | Decimal | NUMERIC | 高精度数值 |
String(n) | str | VARCHAR(n) | 变长字符串 |
Text | str | TEXT | 长文本 |
Boolean | bool | BOOLEAN | 布尔值 |
Date | date | DATE | 日期 |
DateTime | datetime | DATETIME | 日期时间 |
Time | time | TIME | 时间 |
LargeBinary | bytes | BLOB | 二进制数据 |
JSON | dict/list | JSON | JSON 数据 |
Enum | enum | ENUM | 枚举 |
列选项
class Article(db.Model):
id = db.Column(db.Integer, primary_key=True)
# nullable:是否允许空值
title = db.Column(db.String(100), nullable=False)
# default:默认值(可以是值或可调用对象)
views = db.Column(db.Integer, default=0)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# unique:唯一约束
slug = db.Column(db.String(100), unique=True)
# index:创建索引
category = db.Column(db.String(50), index=True)
# server_default:数据库层面的默认值
status = db.Column(db.String(20), server_default='draft')
# onupdate:更新时的值
updated_at = db.Column(db.DateTime, onupdate=datetime.utcnow)
主键与自增
class Product(db.Model):
# 默认自增主键
id = db.Column(db.Integer, primary_key=True)
# 自定义自增起始值
# id = db.Column(db.Integer, primary_key=True, autoincrement=1000)
# UUID 主键
# id = db.Column(db.String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
# 复合主键
# __table_args__ = (
# db.PrimaryKeyConstraint('user_id', 'role_id'),
# )
表级约束
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
product_id = db.Column(db.Integer, db.ForeignKey('products.id'))
__table_args__ = (
# 唯一约束
db.UniqueConstraint('user_id', 'product_id', name='unique_user_product'),
# 检查约束
db.CheckConstraint('quantity > 0', name='positive_quantity'),
# 索引
db.Index('idx_user_product', 'user_id', 'product_id'),
# 表注释
{'comment': '订单表'}
)
关系定义
关系是 ORM 最强大的功能之一,它将数据库的外键关系映射为 Python 对象属性。
一对多关系
最常见的关系统型,如"一个用户有多篇文章":
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
# 一对多关系
# lazy: 加载方式
# backref: 在另一方创建反向引用
posts = db.relationship('Post', backref='author', lazy='dynamic')
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100))
# 外键
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
使用关系:
# 获取用户的所有文章
user = User.query.get(1)
posts = user.posts.all() # 因为 lazy='dynamic',需要调用 all()
# 获取文章的作者
post = Post.query.get(1)
author = post.author # 通过 backref 自动创建
Lazy Loading 详解
lazy 参数控制关系的加载时机和方式:
| 值 | 行为 | 适用场景 |
|---|---|---|
select | 立即加载,使用 SELECT | 默认值,适合小集合 |
joined | 使用 JOIN 立即加载 | 需要一起显示时 |
subquery | 使用子查询立即加载 | 避免重复查询 |
dynamic | 返回可查询对象 | 需要进一步过滤 |
noload | 不加载 | 不需要关系数据 |
raise | 访问时抛出异常 | 防止意外加载 |
class User(db.Model):
# 立即加载(适合小集合)
posts = db.relationship('Post', lazy='select')
# 动态加载(适合大集合,需要进一步筛选)
posts = db.relationship('Post', lazy='dynamic')
# 使用:user.posts.filter_by(published=True).all()
# JOIN 加载(适合列表页显示)
posts = db.relationship('Post', lazy='joined')
一对一关系
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
# uselist=False 表示一对一
profile = db.relationship('Profile', backref='user', uselist=False)
class Profile(db.Model):
id = db.Column(db.Integer, primary_key=True)
bio = db.Column(db.Text)
avatar = db.Column(db.String(200))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True)
使用:
user = User.query.get(1)
profile = user.profile # 返回单个对象,不是列表
profile.user # 反向引用
多对多关系
多对多关系需要中间表:
# 方式一:简单中间表(推荐用于简单关系)
student_course = db.Table(
'student_course',
db.Column('student_id', db.Integer, db.ForeignKey('students.id'), primary_key=True),
db.Column('course_id', db.Integer, db.ForeignKey('courses.id'), primary_key=True),
db.Column('enrolled_at', db.DateTime, default=datetime.utcnow) # 额外字段
)
class Student(db.Model):
__tablename__ = 'students'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
courses = db.relationship(
'Course',
secondary=student_course,
backref=db.backref('students', lazy='dynamic'),
lazy='dynamic'
)
class Course(db.Model):
__tablename__ = 'courses'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
使用:
# 添加关系
student = Student(name='Alice')
course = Course(name='Math')
student.courses.append(course)
db.session.commit()
# 查询
student.courses.all()
course.students.all()
# 删除关系
student.courses.remove(course)
带额外字段的多对多
当中间表需要存储额外信息时,需要定义为模型:
class Enrollment(db.Model):
"""选课记录(中间表模型)"""
__tablename__ = 'enrollments'
student_id = db.Column(db.Integer, db.ForeignKey('students.id'), primary_key=True)
course_id = db.Column(db.Integer, db.ForeignKey('courses.id'), primary_key=True)
# 额外字段
enrolled_at = db.Column(db.DateTime, default=datetime.utcnow)
grade = db.Column(db.String(2)) # 成绩等级
# 关系
student = db.relationship('Student', backref='enrollments')
course = db.relationship('Course', backref='enrollments')
class Student(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
class Course(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
使用:
# 创建选课记录
enrollment = Enrollment(
student=student,
course=course,
grade='A'
)
db.session.add(enrollment)
db.session.commit()
# 查询学生的所有课程
for enrollment in student.enrollments:
print(enrollment.course.name, enrollment.grade)
自引用关系
同一表内的关系,如关注、评论回复:
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
# 关注者(多对多自引用)
followers = db.relationship(
'User',
secondary=followers,
primaryjoin=(followers.c.followed_id == id),
secondaryjoin=(followers.c.follower_id == id),
backref=db.backref('following', lazy='dynamic'),
lazy='dynamic'
)
# 关注关系表
followers = db.Table(
'followers',
db.Column('follower_id', db.Integer, db.ForeignKey('users.id')),
db.Column('followed_id', db.Integer, db.ForeignKey('users.id'))
)
class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text)
# 回复(一对多自引用)
parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'))
replies = db.relationship('Comment', backref=db.backref('parent', remote_side=[id]))
关系级联删除
class User(db.Model):
# 级联删除:删除用户时自动删除其文章
posts = db.relationship(
'Post',
backref='author',
cascade='all, delete-orphan',
lazy='dynamic'
)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id', ondelete='CASCADE'))
级联选项:
| 选项 | 说明 |
|---|---|
save-update | 保存时自动保存关联对象 |
delete | 删除时自动删除关联对象 |
delete-orphan | 从关系中移除时删除孤儿对象 |
merge | 合并会话时合并关联对象 |
refresh-expire | 刷新时过期关联对象 |
expunge | 从会话移除时同时移除关联对象 |
all | 包含以上所有 |
查询操作
基本查询
# 查询所有
users = User.query.all()
# 根据主键查询
user = User.query.get(1) # 返回 None 或对象
user = User.query.get_or_404(1) # 不存在返回 404
# 查询第一条
user = User.query.first()
# 条件查询
user = User.query.filter_by(username='john').first()
users = User.query.filter_by(is_active=True).all()
# 使用 filter(更灵活)
users = User.query.filter(User.username == 'john').all()
users = User.query.filter(User.username != 'john').all()
# LIKE 查询
users = User.query.filter(User.username.like('%john%')).all()
users = User.query.filter(User.username.ilike('%JOHN%')).all() # 不区分大小写
# IN 查询
users = User.query.filter(User.id.in_([1, 2, 3])).all()
# NOT IN
users = User.query.filter(~User.id.in_([1, 2, 3])).all()
# NULL 查询
users = User.query.filter(User.bio.is_(None)).all()
users = User.query.filter(User.bio.isnot(None)).all()
逻辑运算
from sqlalchemy import and_, or_, not_
# AND
users = User.query.filter(
User.is_active == True,
User.is_admin == False
).all()
users = User.query.filter(
and_(User.is_active == True, User.is_admin == False)
).all()
# OR
users = User.query.filter(
or_(User.username == 'john', User.email == '[email protected]')
).all()
# NOT
users = User.query.filter(
not_(User.is_admin)
).all()
# 组合
users = User.query.filter(
or_(
and_(User.is_active, User.username.like('j%')),
User.is_admin
)
).all()
排序与限制
# 排序
users = User.query.order_by(User.created_at).all() # 升序
users = User.query.order_by(User.created_at.desc()).all() # 降序
users = User.query.order_by(User.is_admin.desc(), User.username).all() # 多字段排序
# 限制数量
users = User.query.limit(10).all()
# 偏移(分页用)
users = User.query.offset(20).limit(10).all()
# 组合
users = User.query.order_by(User.created_at.desc()).limit(10).all()
分页
# 方式一:使用 paginate
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
pagination = User.query.paginate(
page=page,
per_page=per_page,
error_out=False # 超出页数返回空列表而非 404
)
users = pagination.items
total = pagination.total
pages = pagination.pages
has_next = pagination.has_next
has_prev = pagination.has_prev
next_num = pagination.next_num
prev_num = pagination.prev_num
# 在模板中使用
<div class="pagination">
{% if pagination.has_prev %}
<a href="{{ url_for('users', page=pagination.prev_num) }}">上一页</a>
{% endif %}
{% for p in pagination.iter_pages() %}
{% if p %}
{% if p == pagination.page %}
<span class="current">{{ p }}</span>
{% else %}
<a href="{{ url_for('users', page=p) }}">{{ p }}</a>
{% endif %}
{% else %}
<span>...</span>
{% endif %}
{% endfor %}
{% if pagination.has_next %}
<a href="{{ url_for('users', page=pagination.next_num) }}">下一页</a>
{% endif %}
</div>
聚合查询
from sqlalchemy import func
# 计数
count = User.query.count()
count = User.query.filter(User.is_active).count()
# 聚合函数
result = db.session.query(
func.count(User.id).label('total'),
func.max(User.created_at).label('latest'),
func.min(User.created_at).label('earliest'),
func.avg(User.age).label('avg_age')
).first()
# 分组统计
result = db.session.query(
User.is_admin,
func.count(User.id).label('count')
).group_by(User.is_admin).all()
# 分组过滤(HAVING)
result = db.session.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.id).having(func.count(Post.id) > 5).all()
连接查询
# JOIN
posts = Post.query.join(User).filter(User.is_admin == True).all()
# 指定连接条件
posts = Post.query.join(User, Post.user_id == User.id).all()
# 左连接
users = User.query.outerjoin(Post).all()
# 多表连接
results = db.session.query(User, Post, Comment)\
.join(Post, User.id == Post.user_id)\
.join(Comment, Post.id == Comment.post_id)\
.all()
# 使用别名的自连接
from sqlalchemy.orm import aliased
Manager = aliased(User)
Employee = aliased(User)
results = db.session.query(Manager, Employee)\
.join(Employee, Manager.id == Employee.manager_id)\
.all()
子查询
from sqlalchemy import subquery
# 子查询示例:查询有文章的用户
subq = db.session.query(Post.user_id).distinct().subquery()
users = User.query.filter(User.id.in_(subq)).all()
# 使用 exists
from sqlalchemy import exists
users = User.query.filter(
exists().where(Post.user_id == User.id)
).all()
# 相关子查询
from sqlalchemy import select
stmt = select(func.count(Post.id)).where(Post.user_id == User.id)
users = db.session.query(User, stmt.scalar_subquery().label('post_count')).all()
预加载优化
解决 N+1 查询问题:
from sqlalchemy.orm import joinedload, selectinload, subqueryload
# N+1 问题示例(不推荐)
users = User.query.all()
for user in users:
print(user.posts) # 每次循环都会执行一次查询
# 方式一:joinedload(JOIN 加载)
users = User.query.options(joinedload(User.posts)).all()
# 生成一条 JOIN 查询
# 方式二:selectinload(IN 查询)
users = User.query.options(selectinload(User.posts)).all()
# 先查询用户,再用 IN 查询文章
# 方式三:subqueryload(子查询)
users = User.query.options(subqueryload(User.posts)).all()
# 多层预加载
users = User.query.options(
joinedload(User.posts).joinedload(Post.comments)
).all()
# 按需加载
from sqlalchemy.orm import Load
users = User.query.options(
Load(User).joinedload(User.posts)
).all()
数据操作
创建记录
# 创建单条记录
user = User(username='john', email='[email protected]')
db.session.add(user)
db.session.commit()
# 批量创建
users = [
User(username='alice', email='[email protected]'),
User(username='bob', email='[email protected]'),
User(username='charlie', email='[email protected]')
]
db.session.add_all(users)
db.session.commit()
# 使用上下文管理器
with app.app_context():
user = User(username='john')
db.session.add(user)
db.session.commit()
# 创建并刷新(获取自增 ID)
user = User(username='john')
db.session.add(user)
db.session.flush() # 发送 SQL 但不提交
print(user.id) # 已经可以获取 ID
db.session.commit()
更新记录
# 方式一:修改属性
user = User.query.get(1)
user.username = 'new_username'
db.session.commit()
# 方式二:批量更新
User.query.filter_by(is_active=False).update({'status': 'inactive'})
db.session.commit()
# 方式三:使用 SQL 表达式
from sqlalchemy import func
User.query.filter(User.id == 1).update({
'login_count': User.login_count + 1,
'last_login': func.now()
}, synchronize_session=False)
db.session.commit()
删除记录
# 删除单条
user = User.query.get(1)
db.session.delete(user)
db.session.commit()
# 批量删除
User.query.filter_by(is_active=False).delete()
db.session.commit()
# 删除并级联
user = User.query.get(1)
db.session.delete(user)
db.session.commit() # 如果配置了级联删除,相关文章也会被删除
事务处理
# 方式一:自动提交
try:
user = User(username='john')
db.session.add(user)
db.session.commit()
except Exception as e:
db.session.rollback()
raise
# 方式二:使用上下文管理器
from sqlalchemy.exc import SQLAlchemyError
try:
with db.session.begin():
user = User(username='john')
db.session.add(user)
except SQLAlchemyError as e:
# 自动回滚
pass
# 方式三:嵌套事务(保存点)
with db.session.begin_nested():
user = User(username='john')
db.session.add(user)
会话管理
# 手动移除对象
db.session.expunge(user)
# 刷新对象状态
db.session.refresh(user)
# 使对象过期(下次访问时重新加载)
db.session.expire(user)
# 使所有对象过期
db.session.expire_all()
# 合并对象(从另一个会话合并)
user = User(username='john')
merged_user = db.session.merge(user)
数据库迁移
Flask-Migrate 提供了数据库版本控制和迁移管理。
初始化
# 初始化迁移目录
flask db init
这会创建 migrations 目录,包含迁移配置和脚本。
创建迁移
# 自动生成迁移脚本
flask db migrate -m "Add user table"
# 查看迁移脚本
cat migrations/versions/xxx_add_user_table.py
迁移脚本示例:
"""Add user table
Revision ID: abc123
Revises:
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'abc123'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic ###
op.create_table('users',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=80), nullable=False),
sa.Column('email', sa.String(length=120), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('email'),
sa.UniqueConstraint('username')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic ###
op.drop_table('users')
# ### end Alembic commands ###
应用迁移
# 应用最新迁移
flask db upgrade
# 迁移到指定版本
flask db upgrade abc123
# 回滚一个版本
flask db downgrade
# 回滚到指定版本
flask db downgrade xyz789
迁移历史
# 查看迁移历史
flask db history
# 查看当前版本
flask db current
# 标记当前数据库版本
flask db stamp head
手动迁移
有时自动生成的迁移需要手动修改:
def upgrade():
# 添加列
op.add_column('users', sa.Column('bio', sa.Text(), nullable=True))
# 创建索引
op.create_index('idx_users_username', 'users', ['username'])
# 执行原始 SQL
op.execute("UPDATE users SET bio = '' WHERE bio IS NULL")
# 添加外键
op.create_foreign_key(
'fk_posts_user_id',
'posts', 'users',
['user_id'], ['id'],
ondelete='CASCADE'
)
def downgrade():
op.drop_column('users', 'bio')
op.drop_index('idx_users_username')
op.drop_constraint('fk_posts_user_id', 'posts', type_='foreignkey')
高级主题
混合属性
在模型上定义既可用于查询又可用于 Python 的属性:
from sqlalchemy.ext.hybrid import hybrid_property, hybrid_method
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
first_name = db.Column(db.String(50))
last_name = db.Column(db.String(50))
@hybrid_property
def full_name(self):
"""Python 属性访问"""
return f'{self.first_name} {self.last_name}'
@full_name.expression
def full_name(cls):
"""SQL 表达式"""
return db.func.concat(cls.first_name, ' ', cls.last_name)
@hybrid_method
def age(self, birth_year):
"""方法"""
import datetime
return datetime.datetime.now().year - birth_year
# 使用
user = User(first_name='John', last_name='Doe')
print(user.full_name) # Python 访问
users = User.query.filter(User.full_name.like('%John%')).all() # SQL 查询
数据库事件
使用事件钩子执行自定义逻辑:
from sqlalchemy import event
@event.listens_for(User, 'before_insert')
def user_before_insert(mapper, connection, target):
"""插入前处理密码"""
if target.password:
target.password_hash = generate_password_hash(target.password)
@event.listens_for(User, 'after_update')
def user_after_update(mapper, connection, target):
"""更新后记录日志"""
log = AuditLog(
table='users',
record_id=target.id,
action='update'
)
connection.execute(log.__table__.insert())
@event.listens_for(db.session, 'before_commit')
def before_commit(session):
"""提交前处理"""
for obj in session.new:
if isinstance(obj, User):
obj.created_at = datetime.utcnow()
模型方法
class User(db.Model):
# ... 字段定义 ...
def set_password(self, password):
"""设置密码哈希"""
self.password_hash = generate_password_hash(password)
def check_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
@classmethod
def get_by_username(cls, username):
"""类方法查询"""
return cls.query.filter_by(username=username).first()
@staticmethod
def generate_fake(count=10):
"""生成测试数据"""
from faker import Faker
fake = Faker()
for _ in range(count):
user = User(
username=fake.user_name(),
email=fake.email(),
bio=fake.text()
)
db.session.add(user)
db.session.commit()
多数据库支持
# 配置
app.config['SQLALCHEMY_BINDS'] = {
'users': 'sqlite:///users.db',
'posts': 'sqlite:///posts.db',
'analytics': 'postgresql://localhost/analytics'
}
# 模型绑定
class User(db.Model):
__bind_key__ = 'users' # 指定数据库
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80))
class Post(db.Model):
__bind_key__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100))
# 操作指定数据库
users = User.query.all() # 使用 users 数据库
posts = Post.query.all() # 使用 posts 数据库
# 直接访问引擎
db.get_engine(app, bind='analytics')
最佳实践
1. 使用环境变量存储敏感配置
import os
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY')
2. 合理配置连接池
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10,
'pool_recycle': 3600,
'pool_pre_ping': True,
'max_overflow': 5
}
3. 使用只读副本分担读压力
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
# 主库(写入)
engine_main = create_engine(DATABASE_URL_MAIN)
# 从库(读取)
engine_replica = create_engine(DATABASE_URL_REPLICA)
# 根据操作选择引擎
4. 避免 N+1 查询
# 错误示例
posts = Post.query.all()
for post in posts:
print(post.author.username) # 每次循环都查询
# 正确示例
posts = Post.query.options(joinedload(Post.author)).all()
for post in posts:
print(post.author.username) # 无额外查询
5. 使用索引优化查询
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, index=True)
email = db.Column(db.String(120), unique=True, index=True)
__table_args__ = (
db.Index('idx_user_created', 'created_at'),
)
6. 合理处理事务
# 短事务
def transfer_money(from_id, to_id, amount):
try:
from_user = User.query.with_for_update().get(from_id)
to_user = User.query.with_for_update().get(to_id)
from_user.balance -= amount
to_user.balance += amount
db.session.commit()
except Exception:
db.session.rollback()
raise
7. 使用模型验证
from sqlalchemy import event
from sqlalchemy.exc import IntegrityError
def validate_email(target, value, oldvalue, initiator):
"""验证邮箱格式"""
if value and '@' not in value:
raise ValueError('无效的邮箱格式')
return value
event.listen(User.email, 'set', validate_email)
小结
本章深入探讨了 Flask-SQLAlchemy 的各个方面:
- ORM 基础:理解 ORM 的价值和 SQLAlchemy 的架构
- 模型定义:字段类型、列选项、表级约束
- 关系映射:一对多、一对一、多对多、自引用关系
- 查询操作:过滤、排序、分页、聚合、连接、子查询
- 数据操作:CRUD、事务、会话管理
- 数据库迁移:使用 Flask-Migrate 管理数据库版本
- 最佳实践:性能优化、安全配置、代码组织
掌握数据库操作是构建复杂 Web 应用的基础,希望本章能帮助你建立坚实的数据库知识体系。