跳到主要内容

异步支持

Flask 2.0 引入了原生异步支持,允许使用 async/await 语法定义路由、错误处理器和请求回调。这一功能使得在 Flask 中处理并发 I/O 操作变得更加自然和高效。

理解异步编程

在深入 Flask 的异步支持之前,我们需要理解同步与异步编程的区别。

同步 vs 异步

同步执行:代码按顺序一行一行执行,每行代码必须完成后才能执行下一行。当遇到耗时操作(如网络请求、数据库查询)时,程序会阻塞等待,无法做其他事情。

# 同步执行 - 串行处理
def get_user_data():
user = get_user_from_db() # 等待 100ms
orders = get_orders_from_db() # 等待 150ms
reviews = get_reviews_from_api() # 等待 200ms
return {'user': user, 'orders': orders, 'reviews': reviews}
# 总耗时:100 + 150 + 200 = 450ms

异步执行:当代码遇到耗时操作时,可以切换去执行其他任务,等操作完成后再回来继续。这样可以并发处理多个 I/O 操作。

# 异步执行 - 并发处理
async def get_user_data():
# 同时发起三个请求,等待时间由最慢的决定
user, orders, reviews = await asyncio.gather(
get_user_from_db(), # 100ms
get_orders_from_db(), # 150ms
get_reviews_from_api() # 200ms
)
return {'user': user, 'orders': orders, 'reviews': reviews}
# 总耗时:max(100, 150, 200) = 200ms

何时使用异步

异步编程适合 I/O 密集型 操作:

  • 并发调用多个外部 API
  • 并发执行多个数据库查询
  • 文件系统操作
  • 网络请求

异步编程 不适合

  • CPU 密集型任务:如机器学习推理、图像处理、PDF 生成
  • 后台任务:如发送邮件、数据处理流水线
  • 长时间运行的任务:如视频转码、数据备份

对于这些场景,应该使用任务队列(如 Celery)而不是异步视图。

安装和配置

要使用 Flask 的异步功能,需要安装带有 async 额外依赖的 Flask:

pip install "Flask[async]"

这会安装 asgiref 库,Flask 使用它来在同步上下文中运行异步代码。

版本要求

  • Python 3.7+(推荐 3.9+,Windows 上 3.8 有已知的 asyncio 问题)
  • Flask 2.0+
  • 使用 gevent 或 eventlet 时需要 greenlet >= 1.0

异步视图函数

基本用法

定义异步视图非常简单,只需使用 async def

from flask import Flask, jsonify
import asyncio

app = Flask(__name__)

@app.route('/api/data')
async def get_data():
"""异步视图函数"""
# 模拟异步操作
await asyncio.sleep(1)
return jsonify({'message': 'Data loaded asynchronously'})

并发 HTTP 请求

异步视图最常见的用例是并发调用多个外部 API:

import aiohttp
from flask import Flask, jsonify

app = Flask(__name__)

async def fetch_url(session, url):
"""异步获取单个 URL"""
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'data': await response.json()
}

@app.route('/api/users/<int:user_id>')
async def get_user_profile(user_id):
"""并发获取用户相关的多个数据源"""
async with aiohttp.ClientSession() as session:
# 同时发起多个请求
user, posts, comments = await asyncio.gather(
session.get(f'https://api.example.com/users/{user_id}'),
session.get(f'https://api.example.com/users/{user_id}/posts'),
session.get(f'https://api.example.com/users/{user_id}/comments')
)

return jsonify({
'user': await user.json(),
'posts': await posts.json(),
'comments': await comments.json()
})

异步数据库查询

使用支持异步的数据库驱动:

from flask import Flask, jsonify
import asyncpg

app = Flask(__name__)

# 数据库连接池
async def get_db_pool():
return await asyncpg.create_pool(
host='localhost',
port=5432,
user='postgres',
password='password',
database='myapp'
)

@app.route('/api/users')
async def list_users():
"""异步查询用户列表"""
pool = await get_db_pool()
async with pool.acquire() as conn:
users = await conn.fetch('SELECT id, username, email FROM users')
return jsonify([dict(user) for user in users])

与 SQLAlchemy 异步集成

SQLAlchemy 1.4+ 支持异步操作:

from flask import Flask, jsonify
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

app = Flask(__name__)

# 异步数据库引擎
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/myapp')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession)

@app.route('/api/products')
async def list_products():
"""异步查询产品"""
async with AsyncSessionLocal() as session:
result = await session.execute(select(Product))
products = result.scalars().all()
return jsonify([p.to_dict() for p in products])

异步请求回调

Flask 支持异步的 before_requestafter_requestteardown_request

from flask import Flask, request, g
import asyncio

app = Flask(__name__)

async def load_current_user():
"""模拟从数据库异步加载用户"""
await asyncio.sleep(0.1) # 模拟数据库查询
return {'id': 1, 'username': 'alice'}

async def log_request_info(response):
"""异步记录请求日志"""
await asyncio.sleep(0.05) # 模拟写入日志
app.logger.info(f'{request.method} {request.path} - {response.status_code}')
return response

@app.before_request
async def before_request():
"""请求前异步加载用户"""
g.current_user = await load_current_user()

@app.after_request
async def after_request(response):
"""请求后异步记录日志"""
return await log_request_info(response)

@app.teardown_request
async def teardown_request(exception=None):
"""请求结束时清理资源"""
if exception:
app.logger.error(f'Request failed: {exception}')

异步错误处理器

错误处理器也可以是异步的:

from flask import Flask, jsonify, render_template
import asyncio

app = Flask(__name__)

async def send_error_notification(error):
"""异步发送错误通知"""
await asyncio.sleep(0.1) # 模拟发送邮件或消息
app.logger.error(f'Error occurred: {error}')

@app.errorhandler(404)
async def not_found(error):
"""异步处理 404 错误"""
await send_error_notification(error)
return jsonify({'error': 'Not found', 'message': str(error)}), 404

@app.errorhandler(500)
async def internal_error(error):
"""异步处理 500 错误"""
await send_error_notification(error)
return render_template('500.html'), 500

@app.errorhandler(Exception)
async def handle_exception(error):
"""异步处理所有未捕获的异常"""
await send_error_notification(error)
return jsonify({'error': 'Internal server error'}), 500

异步类视图

基于类的视图同样支持异步:

from flask.views import View, MethodView
from flask import Flask, jsonify

app = Flask(__name__)

# 继承 View 的异步类视图
class AsyncUserAPI(View):
methods = ['GET', 'POST']

async def dispatch_request(self):
# 异步处理请求
return jsonify({'message': 'Async class view'})

app.add_url_rule('/api/users', view_func=AsyncUserAPI.as_view('users'))

# 继承 MethodView 的异步类视图
class AsyncProductAPI(MethodView):
async def get(self, product_id):
"""获取产品"""
product = await self.get_product(product_id)
return jsonify(product)

async def post(self):
"""创建产品"""
product = await self.create_product()
return jsonify(product), 201

async def put(self, product_id):
"""更新产品"""
product = await self.update_product(product_id)
return jsonify(product)

async def delete(self, product_id):
"""删除产品"""
await self.delete_product(product_id)
return '', 204

async def get_product(self, product_id):
# 模拟异步数据库查询
return {'id': product_id, 'name': 'Product'}

async def create_product(self):
# 模拟异步创建
return {'id': 1, 'name': 'New Product'}

async def update_product(self, product_id):
# 模拟异步更新
return {'id': product_id, 'name': 'Updated Product'}

async def delete_product(self, product_id):
# 模拟异步删除
pass

app.add_url_rule('/api/products', view_func=AsyncProductAPI.as_view('products'))
app.add_url_rule('/api/products/<int:product_id>', view_func=AsyncProductAPI.as_view('product'))

性能考量

Flask 异步的工作原理

Flask 是一个 WSGI 框架,本身是同步的。当处理异步视图时,Flask 会:

  1. 在一个子线程中创建事件循环
  2. 在事件循环中运行异步视图函数
  3. 等待异步函数完成后返回结果

这意味着:

  • 每个请求仍然占用一个 worker
  • 异步不会增加并发处理能力
  • 但可以在单个请求内并发执行多个 I/O 操作

同步 vs 异步性能对比

import time
import aiohttp
import requests
from flask import Flask, jsonify

app = Flask(__name__)

# 同步版本 - 串行请求
@app.route('/sync')
def sync_fetch():
start = time.time()
urls = [
'https://api.github.com/users/python',
'https://api.github.com/users/flask',
'https://api.github.com/users/pallets'
]
results = []
for url in urls:
response = requests.get(url)
results.append(response.json())

return jsonify({
'time': time.time() - start,
'results': results
})
# 平均耗时:3-4.5 秒(串行等待)

# 异步版本 - 并发请求
@app.route('/async')
async def async_fetch():
start = time.time()
urls = [
'https://api.github.com/users/python',
'https://api.github.com/users/flask',
'https://api.github.com/users/pallets'
]

async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = [await r.json() for r in responses]

return jsonify({
'time': time.time() - start,
'results': results
})
# 平均耗时:0.4-0.6 秒(并发等待,由最慢的请求决定)

在这个例子中,异步版本比同步版本快约 8 倍

性能建议

  1. 只在需要并发 I/O 时使用异步:单次数据库查询或 API 调用不需要异步
  2. 避免 CPU 密集型操作:异步不会让 CPU 计算变快
  3. 使用连接池:重用数据库和 HTTP 连接
  4. 合理设置超时:避免单个慢请求拖垮整体性能
import aiohttp
from flask import Flask, jsonify

app = Flask(__name__)

# 配置超时
TIMEOUT = aiohttp.ClientTimeout(total=10)

async def fetch_with_timeout(session, url):
"""带超时的请求"""
try:
async with session.get(url, timeout=TIMEOUT) as response:
return await response.json()
except asyncio.TimeoutError:
return {'error': 'Request timeout'}

后台任务的限制

Flask 的异步支持 不支持 在视图中生成后台任务:

# 这不会按预期工作!
@app.route('/send-email')
async def send_email():
# 这个任务会被取消,因为视图函数返回后事件循环就停止了
asyncio.create_task(send_email_background())
return jsonify({'status': 'email will be sent'})

如果需要后台任务,应该使用任务队列:

# 使用 Celery 处理后台任务
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379')

@celery.task
def send_email_task(to, subject, body):
# 发送邮件的逻辑
pass

@app.route('/send-email')
def send_email():
send_email_task.delay('[email protected]', 'Hello', 'World')
return jsonify({'status': 'email will be sent'})

何时考虑 Quart

如果你的应用主要是异步代码,可以考虑使用 Quart。Quart 是 Flask API 的 ASGI 重新实现,提供真正的异步支持:

特性FlaskQuart
协议WSGIASGI
异步视图支持(在子线程中运行)原生支持
并发请求受限于 worker 数量可以处理大量并发
WebSocket需要扩展原生支持
长连接不支持支持
Flask 扩展完全兼容大部分兼容
# Quart 示例 - 与 Flask 几乎相同的 API
from quart import Quart, jsonify, websocket

app = Quart(__name__)

@app.route('/')
async def hello():
return jsonify({'message': 'Hello from Quart!'})

@app.websocket('/ws')
async def ws():
while True:
data = await websocket.receive()
await websocket.send(f"Echo: {data}")

if __name__ == '__main__':
app.run() # 使用 Hypercorn 运行:hypercorn app:app

扩展兼容性

Flask 2.0 之前的扩展可能不支持异步视图。扩展作者可以使用 ensure_sync() 方法来支持异步:

from functools import wraps
from flask import current_app

def my_decorator(f):
"""支持异步视图的装饰器"""
@wraps(f)
async def async_wrapper(*args, **kwargs):
# 异步逻辑
return await f(*args, **kwargs)

@wraps(f)
def sync_wrapper(*args, **kwargs):
# 同步逻辑
return f(*args, **kwargs)

# 根据函数类型选择包装器
import asyncio
if asyncio.iscoroutinefunction(f):
return async_wrapper
return sync_wrapper

在使用扩展之前,检查其文档确认是否支持异步。

测试异步视图

使用 pytest 测试异步视图与同步视图类似:

import pytest
from myapp import app

@pytest.fixture
def client():
app.config['TESTING'] = True
with app.test_client() as client:
yield client

def test_async_route(client):
"""测试异步路由"""
response = client.get('/api/async-data')
assert response.status_code == 200
assert 'data' in response.json

def test_async_error_handler(client):
"""测试异步错误处理器"""
response = client.get('/nonexistent')
assert response.status_code == 404

对于独立的异步函数,可以使用 pytest-asyncio

import pytest
import aiohttp

@pytest.mark.asyncio
async def test_fetch_url():
"""测试异步 HTTP 客户端"""
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get') as response:
assert response.status == 200
data = await response.json()
assert 'url' in data

最佳实践

1. 选择正确的场景

# 好:并发 I/O 操作
@app.route('/api/dashboard')
async def dashboard():
stats, users, alerts = await asyncio.gather(
get_stats(),
get_users(),
get_alerts()
)
return jsonify({'stats': stats, 'users': users, 'alerts': alerts})

# 不推荐:单个数据库查询(同步更简单)
@app.route('/api/user/<int:id>')
async def get_user(id): # 不必要的异步
user = await db.get_user(id) # 单个查询,异步没有优势
return jsonify(user)

# 推荐:单个查询保持同步
@app.route('/api/user/<int:id>')
def get_user(id):
user = db.get_user(id)
return jsonify(user)

2. 使用连接池

# 全局连接池
import aiohttp

http_session = None

async def get_http_session():
global http_session
if http_session is None:
http_session = aiohttp.ClientSession()
return http_session

@app.teardown_appcontext
async def close_http_session(exception=None):
global http_session
if http_session:
await http_session.close()
http_session = None

@app.route('/api/data')
async def get_data():
session = await get_http_session()
async with session.get('https://api.example.com/data') as response:
return await response.json()

3. 处理异常

@app.route('/api/external')
async def call_external_api():
try:
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as response:
if response.status != 200:
return jsonify({'error': 'API error'}), 502
return await response.json()
except aiohttp.ClientError as e:
app.logger.error(f'HTTP client error: {e}')
return jsonify({'error': 'Connection error'}), 503
except asyncio.TimeoutError:
return jsonify({'error': 'Request timeout'}), 504

4. 超时控制

import asyncio
from functools import wraps

def async_timeout(seconds):
"""异步超时装饰器"""
def decorator(f):
@wraps(f)
async def wrapper(*args, **kwargs):
try:
return await asyncio.wait_for(f(*args, **kwargs), timeout=seconds)
except asyncio.TimeoutError:
return jsonify({'error': 'Operation timeout'}), 504
return wrapper
return decorator

@app.route('/api/slow')
@async_timeout(5)
async def slow_operation():
await asyncio.sleep(10) # 这会触发超时
return jsonify({'status': 'ok'})

小结

本章介绍了 Flask 的异步支持:

  1. 异步基础:理解何时使用异步,何时使用同步
  2. 异步视图:使用 async def 定义路由,并发处理 I/O
  3. 异步回调before_requestafter_request 的异步版本
  4. 性能考量:异步不会增加并发能力,但能加速单请求内的并发 I/O
  5. 扩展兼容:检查扩展是否支持异步视图

Flask 的异步支持是一个强大的工具,但不是万能的。正确理解其工作原理和适用场景,才能发挥其最大价值。

参考资料