最佳实践
本章总结 asyncio 开发中的常见陷阱、性能优化技巧和最佳实践。
常见陷阱
1. 阻塞事件循环
问题:在协程中调用同步阻塞函数会阻塞整个事件循环。
import asyncio
import time
async def bad_example():
# 错误:time.sleep 会阻塞事件循环
time.sleep(5) # 这 5 秒内,所有协程都无法执行
return "完成"
async def good_example():
# 正确:使用 asyncio.sleep
await asyncio.sleep(5) # 其他协程可以在这期间执行
return "完成"
解决方案:对于必须使用的阻塞函数,使用 run_in_executor 在线程池中执行:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_function():
"""阻塞函数"""
time.sleep(5)
return "完成"
async def main():
# 在线程池中执行阻塞函数
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)
print(result)
asyncio.run(main())
或使用 asyncio.to_thread(Python 3.9+):
async def main():
result = await asyncio.to_thread(blocking_function)
print(result)
2. 忘记 await
问题:调用协程函数但不 await,导致协程未执行。
import asyncio
async def fetch_data():
await asyncio.sleep(1)
return "数据"
async def bad_example():
data = fetch_data() # 只是创建了协程对象!
print(data) # <coroutine object fetch_data at 0x...>
async def good_example():
data = await fetch_data() # 正确:等待协程完成
print(data) # "数据"
asyncio.run(good_example())
解决方案:启用 Python 警告,检查未 await 的协程:
# 运行时添加警告
import warnings
warnings.simplefilter('always', ResourceWarning)
3. 在协程外使用 async 语法
问题:在普通函数中使用 await 或 async with。
# 错误:普通函数不能使用 await
def normal_function():
await asyncio.sleep(1) # SyntaxError
# 正确:必须是协程函数
async def async_function():
await asyncio.sleep(1)
4. 混用同步和异步代码
问题:在异步代码中混入同步 I/O 操作。
import asyncio
import requests # 同步 HTTP 库
async def bad_example():
# 错误:requests 是同步库,会阻塞事件循环
response = requests.get("https://api.example.com")
return response.json()
# 正确:使用异步 HTTP 库
import aiohttp
async def good_example():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
5. 任务引用丢失
问题:创建任务但不保存引用,可能被垃圾回收。
import asyncio
async def background_task():
await asyncio.sleep(10)
print("后台任务完成")
async def bad_example():
# 错误:任务可能被垃圾回收
asyncio.create_task(background_task())
await asyncio.sleep(1)
# 任务可能在这里被回收!
async def good_example():
# 正确:保存任务引用
task = asyncio.create_task(background_task())
await asyncio.sleep(1)
await task # 确保任务完成
# 或者使用集合管理
async def better_example():
tasks = set()
for i in range(10):
task = asyncio.create_task(background_task())
tasks.add(task)
task.add_done_callback(tasks.discard)
await asyncio.sleep(15) # 等待任务完成
6. 异常处理不当
问题:协程中的异常未被正确捕获。
import asyncio
async def failing_task():
raise ValueError("出错了!")
async def bad_example():
task = asyncio.create_task(failing_task())
# 不等待任务,异常被静默忽略
await asyncio.sleep(1)
async def good_example():
task = asyncio.create_task(failing_task())
try:
await task
except ValueError as e:
print(f"捕获异常: {e}")
性能优化
1. 使用 TaskGroup 替代 gather
Python 3.11+ 的 TaskGroup 提供更好的异常处理和资源管理:
import asyncio
# 旧方式:gather
async def old_way():
results = await asyncio.gather(
task1(),
task2(),
task3(),
)
# 新方式:TaskGroup(推荐)
async def new_way():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task1())
t2 = tg.create_task(task2())
t3 = tg.create_task(task3())
results = [t1.result(), t2.result(), t3.result()]
2. 使用 Eager Task Factory(Python 3.12+)
eager_task_factory 是 Python 3.12 引入的新特性,可以让协程在创建时立即执行,而不是等待事件循环调度。
传统任务执行流程:
import asyncio
async def quick_task():
return 42 # 立即返回,没有异步操作
async def main():
# 传统方式:任务被调度到事件循环
task = asyncio.create_task(quick_task())
# 任务不会立即执行,需要等到下一个事件循环迭代
result = await task
使用 eager_task_factory:
import asyncio
async def quick_task():
return 42
async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)
# 任务会立即执行
task = asyncio.create_task(quick_task())
# 如果任务同步完成,不需要事件循环调度
print(task.done()) # 可能已经是 True
性能优势场景:
import asyncio
# 带缓存的异步函数
_cache = {}
async def fetch_with_cache(key):
"""使用缓存的异步获取"""
if key in _cache:
# 缓存命中,直接返回
return _cache[key]
# 缓存未命中,执行异步操作
await asyncio.sleep(1)
_cache[key] = f"value-{key}"
return _cache[key]
async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)
# 第一次调用:需要异步操作
result1 = await fetch_with_cache("key1")
# 第二次调用:缓存命中,无需事件循环调度
result2 = await fetch_with_cache("key1") # 立即返回
使用自定义 Task 类:
import asyncio
class CustomTask(asyncio.Task):
"""自定义 Task 类,添加额外功能"""
pass
async def main():
loop = asyncio.get_running_loop()
# 创建使用自定义 Task 的 eager factory
factory = asyncio.create_eager_task_factory(CustomTask)
loop.set_task_factory(factory)
# 现在创建的任务使用 CustomTask 类
task = asyncio.create_task(some_coro())
注意事项:
# eager_task_factory 会改变执行语义
# 任务可能在 create_task 调用期间就完成
async def example():
print("开始")
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)
task = asyncio.create_task(asyncio.sleep(0))
# 此时 task 可能已经开始执行或已完成
print("任务创建后")
await task
print("任务完成")
# 输出顺序可能与预期不同
何时使用:
| 场景 | 推荐 |
|---|---|
| 带缓存的异步操作 | ✅ 推荐 |
| 快速完成的协程 | ✅ 推荐 |
| 需要严格执行顺序 | ⚠️ 谨慎 |
| 与旧代码兼容 | ⚠️ 测试后使用 |
2. 控制并发数量
使用 Semaphore 避免过度并发:
import asyncio
semaphore = asyncio.Semaphore(10) # 最多 10 个并发
async def fetch(url):
async with semaphore:
# 执行请求
pass
async def main():
urls = [...] # 1000 个 URL
await asyncio.gather(*[fetch(url) for url in urls])
3. 批量处理
对于大量小任务,批量处理更高效:
import asyncio
async def process_batch(items):
"""处理一批数据"""
await asyncio.sleep(0.1)
return [item * 2 for item in items]
async def main():
items = list(range(1000))
batch_size = 100
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
results.extend(await process_batch(batch))
3. 连接复用
复用 HTTP 连接和数据库连接:
import aiohttp
async def fetch_all(urls):
# 复用 session,避免重复创建连接
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.text()
return await asyncio.gather(*[fetch(url) for url in urls])
代码组织
1. 分离同步和异步代码
# utils.py - 同步工具函数
def parse_json(data):
return json.loads(data)
def format_output(data):
return json.dumps(data, indent=2)
# async_utils.py - 异步工具函数
async def fetch_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# main.py - 主程序
async def main():
data = await fetch_json("https://api.example.com")
parsed = parse_json(data) # 同步处理
print(format_output(parsed))
2. 使用依赖注入
import asyncio
from dataclasses import dataclass
@dataclass
class Config:
base_url: str
timeout: float = 10.0
class APIClient:
def __init__(self, config: Config):
self.config = config
async def fetch(self, endpoint):
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.config.base_url}{endpoint}",
timeout=aiohttp.ClientTimeout(self.config.timeout)
) as response:
return await response.json()
async def main():
config = Config(base_url="https://api.example.com")
client = APIClient(config)
data = await client.fetch("/users")
3. 清理资源
使用 try/finally 或上下文管理器确保资源清理:
import asyncio
class AsyncResource:
async def __aenter__(self):
print("获取资源")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
async def do_something(self):
print("使用资源")
async def main():
async with AsyncResource() as resource:
await resource.do_something()
调试技巧
1. 启用调试模式
import asyncio
async def main():
# 开启调试模式
asyncio.get_running_loop().set_debug(True)
# 或在运行时
asyncio.run(main(), debug=True)
调试模式会:
- 检测未 await 的协程
- 报告长时间阻塞的操作
- 显示更详细的错误信息
2. 记录任务状态
import asyncio
async def debug_tasks():
"""打印所有任务状态"""
tasks = asyncio.all_tasks()
for task in tasks:
print(f"任务: {task.get_name()}, 状态: {task._state}")
3. 使用 logging
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
async def worker(n):
logger.debug(f"Worker {n} 开始")
await asyncio.sleep(1)
logger.debug(f"Worker {n} 完成")
测试异步代码
使用 pytest-asyncio
# test_example.py
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await some_async_function()
assert result == expected_value
@pytest.mark.asyncio
async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_function(), timeout=1.0)
模拟异步函数
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
with patch('module.async_function') as mock:
mock.return_value = "mocked result"
result = await call_async_function()
assert result == "mocked result"
小结
| 类别 | 最佳实践 |
|---|---|
| 避免阻塞 | 使用 to_thread 或 run_in_executor |
| 异常处理 | 总是 await 任务,捕获异常 |
| 资源管理 | 使用 async with,保存任务引用 |
| 并发控制 | 使用 Semaphore 限制并发 |
| 代码组织 | 分离同步/异步,使用依赖注入 |
| 调试 | 启用调试模式,记录日志 |
| 测试 | 使用 pytest-asyncio |
遵循这些最佳实践,可以编写出高效、可靠、易维护的异步代码。