跳到主要内容

最佳实践

本章总结 asyncio 开发中的常见陷阱、性能优化技巧和最佳实践。

常见陷阱

1. 阻塞事件循环

问题:在协程中调用同步阻塞函数会阻塞整个事件循环。

import asyncio
import time

async def bad_example():
# 错误:time.sleep 会阻塞事件循环
time.sleep(5) # 这 5 秒内,所有协程都无法执行
return "完成"

async def good_example():
# 正确:使用 asyncio.sleep
await asyncio.sleep(5) # 其他协程可以在这期间执行
return "完成"

解决方案:对于必须使用的阻塞函数,使用 run_in_executor 在线程池中执行:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_function():
"""阻塞函数"""
time.sleep(5)
return "完成"

async def main():
# 在线程池中执行阻塞函数
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)
print(result)

asyncio.run(main())

或使用 asyncio.to_thread(Python 3.9+):

async def main():
result = await asyncio.to_thread(blocking_function)
print(result)

2. 忘记 await

问题:调用协程函数但不 await,导致协程未执行。

import asyncio

async def fetch_data():
await asyncio.sleep(1)
return "数据"

async def bad_example():
data = fetch_data() # 只是创建了协程对象!
print(data) # <coroutine object fetch_data at 0x...>

async def good_example():
data = await fetch_data() # 正确:等待协程完成
print(data) # "数据"

asyncio.run(good_example())

解决方案:启用 Python 警告,检查未 await 的协程:

# 运行时添加警告
import warnings
warnings.simplefilter('always', ResourceWarning)

3. 在协程外使用 async 语法

问题:在普通函数中使用 awaitasync with

# 错误:普通函数不能使用 await
def normal_function():
await asyncio.sleep(1) # SyntaxError

# 正确:必须是协程函数
async def async_function():
await asyncio.sleep(1)

4. 混用同步和异步代码

问题:在异步代码中混入同步 I/O 操作。

import asyncio
import requests # 同步 HTTP 库

async def bad_example():
# 错误:requests 是同步库,会阻塞事件循环
response = requests.get("https://api.example.com")
return response.json()

# 正确:使用异步 HTTP 库
import aiohttp

async def good_example():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()

5. 任务引用丢失

问题:创建任务但不保存引用,可能被垃圾回收。

import asyncio

async def background_task():
await asyncio.sleep(10)
print("后台任务完成")

async def bad_example():
# 错误:任务可能被垃圾回收
asyncio.create_task(background_task())
await asyncio.sleep(1)
# 任务可能在这里被回收!

async def good_example():
# 正确:保存任务引用
task = asyncio.create_task(background_task())
await asyncio.sleep(1)
await task # 确保任务完成

# 或者使用集合管理
async def better_example():
tasks = set()
for i in range(10):
task = asyncio.create_task(background_task())
tasks.add(task)
task.add_done_callback(tasks.discard)

await asyncio.sleep(15) # 等待任务完成

6. 异常处理不当

问题:协程中的异常未被正确捕获。

import asyncio

async def failing_task():
raise ValueError("出错了!")

async def bad_example():
task = asyncio.create_task(failing_task())
# 不等待任务,异常被静默忽略
await asyncio.sleep(1)

async def good_example():
task = asyncio.create_task(failing_task())
try:
await task
except ValueError as e:
print(f"捕获异常: {e}")

性能优化

1. 使用 TaskGroup 替代 gather

Python 3.11+ 的 TaskGroup 提供更好的异常处理和资源管理:

import asyncio

# 旧方式:gather
async def old_way():
results = await asyncio.gather(
task1(),
task2(),
task3(),
)

# 新方式:TaskGroup(推荐)
async def new_way():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task1())
t2 = tg.create_task(task2())
t3 = tg.create_task(task3())
results = [t1.result(), t2.result(), t3.result()]

2. 使用 Eager Task Factory(Python 3.12+)

eager_task_factory 是 Python 3.12 引入的新特性,可以让协程在创建时立即执行,而不是等待事件循环调度。

传统任务执行流程

import asyncio

async def quick_task():
return 42 # 立即返回,没有异步操作

async def main():
# 传统方式:任务被调度到事件循环
task = asyncio.create_task(quick_task())
# 任务不会立即执行,需要等到下一个事件循环迭代
result = await task

使用 eager_task_factory

import asyncio

async def quick_task():
return 42

async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)

# 任务会立即执行
task = asyncio.create_task(quick_task())
# 如果任务同步完成,不需要事件循环调度
print(task.done()) # 可能已经是 True

性能优势场景

import asyncio

# 带缓存的异步函数
_cache = {}

async def fetch_with_cache(key):
"""使用缓存的异步获取"""
if key in _cache:
# 缓存命中,直接返回
return _cache[key]

# 缓存未命中,执行异步操作
await asyncio.sleep(1)
_cache[key] = f"value-{key}"
return _cache[key]

async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)

# 第一次调用:需要异步操作
result1 = await fetch_with_cache("key1")

# 第二次调用:缓存命中,无需事件循环调度
result2 = await fetch_with_cache("key1") # 立即返回

使用自定义 Task 类

import asyncio

class CustomTask(asyncio.Task):
"""自定义 Task 类,添加额外功能"""
pass

async def main():
loop = asyncio.get_running_loop()

# 创建使用自定义 Task 的 eager factory
factory = asyncio.create_eager_task_factory(CustomTask)
loop.set_task_factory(factory)

# 现在创建的任务使用 CustomTask 类
task = asyncio.create_task(some_coro())

注意事项

# eager_task_factory 会改变执行语义
# 任务可能在 create_task 调用期间就完成

async def example():
print("开始")

loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)

task = asyncio.create_task(asyncio.sleep(0))
# 此时 task 可能已经开始执行或已完成

print("任务创建后")
await task
print("任务完成")

# 输出顺序可能与预期不同

何时使用

场景推荐
带缓存的异步操作✅ 推荐
快速完成的协程✅ 推荐
需要严格执行顺序⚠️ 谨慎
与旧代码兼容⚠️ 测试后使用

2. 控制并发数量

使用 Semaphore 避免过度并发:

import asyncio

semaphore = asyncio.Semaphore(10) # 最多 10 个并发

async def fetch(url):
async with semaphore:
# 执行请求
pass

async def main():
urls = [...] # 1000 个 URL
await asyncio.gather(*[fetch(url) for url in urls])

3. 批量处理

对于大量小任务,批量处理更高效:

import asyncio

async def process_batch(items):
"""处理一批数据"""
await asyncio.sleep(0.1)
return [item * 2 for item in items]

async def main():
items = list(range(1000))
batch_size = 100

results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
results.extend(await process_batch(batch))

3. 连接复用

复用 HTTP 连接和数据库连接:

import aiohttp

async def fetch_all(urls):
# 复用 session,避免重复创建连接
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.text()

return await asyncio.gather(*[fetch(url) for url in urls])

代码组织

1. 分离同步和异步代码

# utils.py - 同步工具函数
def parse_json(data):
return json.loads(data)

def format_output(data):
return json.dumps(data, indent=2)

# async_utils.py - 异步工具函数
async def fetch_json(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()

# main.py - 主程序
async def main():
data = await fetch_json("https://api.example.com")
parsed = parse_json(data) # 同步处理
print(format_output(parsed))

2. 使用依赖注入

import asyncio
from dataclasses import dataclass

@dataclass
class Config:
base_url: str
timeout: float = 10.0

class APIClient:
def __init__(self, config: Config):
self.config = config

async def fetch(self, endpoint):
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.config.base_url}{endpoint}",
timeout=aiohttp.ClientTimeout(self.config.timeout)
) as response:
return await response.json()

async def main():
config = Config(base_url="https://api.example.com")
client = APIClient(config)
data = await client.fetch("/users")

3. 清理资源

使用 try/finally 或上下文管理器确保资源清理:

import asyncio

class AsyncResource:
async def __aenter__(self):
print("获取资源")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")

async def do_something(self):
print("使用资源")

async def main():
async with AsyncResource() as resource:
await resource.do_something()

调试技巧

1. 启用调试模式

import asyncio

async def main():
# 开启调试模式
asyncio.get_running_loop().set_debug(True)

# 或在运行时
asyncio.run(main(), debug=True)

调试模式会:

  • 检测未 await 的协程
  • 报告长时间阻塞的操作
  • 显示更详细的错误信息

2. 记录任务状态

import asyncio

async def debug_tasks():
"""打印所有任务状态"""
tasks = asyncio.all_tasks()
for task in tasks:
print(f"任务: {task.get_name()}, 状态: {task._state}")

3. 使用 logging

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def worker(n):
logger.debug(f"Worker {n} 开始")
await asyncio.sleep(1)
logger.debug(f"Worker {n} 完成")

测试异步代码

使用 pytest-asyncio

# test_example.py
import pytest

@pytest.mark.asyncio
async def test_async_function():
result = await some_async_function()
assert result == expected_value

@pytest.mark.asyncio
async def test_with_timeout():
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_function(), timeout=1.0)

模拟异步函数

from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
with patch('module.async_function') as mock:
mock.return_value = "mocked result"
result = await call_async_function()
assert result == "mocked result"

小结

类别最佳实践
避免阻塞使用 to_threadrun_in_executor
异常处理总是 await 任务,捕获异常
资源管理使用 async with,保存任务引用
并发控制使用 Semaphore 限制并发
代码组织分离同步/异步,使用依赖注入
调试启用调试模式,记录日志
测试使用 pytest-asyncio

遵循这些最佳实践,可以编写出高效、可靠、易维护的异步代码。