异步上下文管理器
异步上下文管理器允许我们在进入和退出代码块时执行异步操作。这对于管理需要异步初始化或清理的资源非常有用,例如数据库连接、网络会话、文件句柄等。
同步上下文管理器回顾
在讨论异步上下文管理器之前,先回顾同步上下文管理器的工作原理。
同步上下文管理器
同步上下文管理器实现 __enter__ 和 __exit__ 方法:
class Timer:
"""计时器上下文管理器"""
def __enter__(self):
import time
self.start = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
import time
elapsed = time.time() - self.start
print(f"耗时: {elapsed:.2f} 秒")
return False # 不抑制异常
# 使用 with 语句
with Timer():
# 执行一些操作
sum(range(1000000))
同步上下文管理器的限制
__enter__ 和 __exit__ 方法不能是异步的,无法在其中执行 await 操作:
class AsyncResource:
def __enter__(self):
# 不能在这里使用 await!
await self.connect() # SyntaxError
return self
对于需要异步初始化或清理的资源,就需要异步上下文管理器。
异步上下文管理器基础
基本语法
异步上下文管理器实现 __aenter__ 和 __aexit__ 方法:
import asyncio
class AsyncTimer:
"""异步计时器"""
async def __aenter__(self):
self.start = asyncio.get_event_loop().time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
elapsed = asyncio.get_event_loop().time() - self.start
print(f"耗时: {elapsed:.2f} 秒")
return False
async def main():
async with AsyncTimer():
await asyncio.sleep(1)
print("执行中...")
asyncio.run(main())
输出:
执行中...
耗时: 1.00 秒
async with 语法
使用 async with 语句进入异步上下文管理器:
async def main():
# async with 自动调用 __aenter__ 和 __aexit__
async with AsyncTimer() as timer:
# timer 是 __aenter__ 的返回值
await asyncio.sleep(1)
print(f"开始时间: {timer.start}")
关键区别
| 特性 | 同步上下文管理器 | 异步上下文管理器 |
|---|---|---|
| 进入方法 | __enter__ | __aenter__ |
| 退出方法 | __exit__ | __aexit__ |
| 方法类型 | 同步方法 | 异步方法(async def) |
| 使用语法 | with obj: | async with obj: |
实现异步上下文管理器
手动实现类
对于复杂的资源管理,手动实现 __aenter__ 和 __aexit__:
import asyncio
class AsyncDatabaseConnection:
"""异步数据库连接"""
def __init__(self, host, port):
self.host = host
self.port = port
self._connection = None
async def __aenter__(self):
# 异步建立连接
print(f"连接到 {self.host}:{self.port}")
await asyncio.sleep(0.5) # 模拟连接过程
self._connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭连接
print("关闭连接")
await asyncio.sleep(0.1) # 模拟关闭过程
self._connection = None
# 处理异常
if exc_type:
print(f"发生异常: {exc_val}")
return False # 不抑制异常
async def query(self, sql):
"""执行查询"""
if not self._connection:
raise RuntimeError("未连接")
await asyncio.sleep(0.1) # 模拟查询
return f"查询结果: {sql}"
async def main():
async with AsyncDatabaseConnection("localhost", 5432) as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
输出:
连接到 localhost:5432
查询结果: SELECT * FROM users
关闭连接
使用 @asynccontextmanager 装饰器
contextlib.asynccontextmanager 提供了更简洁的实现方式(Python 3.7+):
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer(name):
"""使用装饰器实现的异步计时器"""
start = asyncio.get_event_loop().time()
print(f"{name} 开始")
try:
yield # yield 之前是 __aenter__ 的逻辑
# yield 之后是 __aexit__ 的逻辑
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{name} 结束,耗时: {elapsed:.2f} 秒")
async def main():
async with async_timer("任务"):
await asyncio.sleep(1)
asyncio.run(main())
输出:
任务 开始
任务 结束,耗时: 1.00 秒
yield 返回值
yield 语句可以返回值,作为 as 后面的变量:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_session():
"""创建数据库会话"""
session = {"id": 1, "connected": True}
print("创建会话")
try:
yield session # 返回给 async with ... as session
finally:
session["connected"] = False
print("关闭会话")
async def main():
async with get_session() as session:
print(f"使用会话: {session}")
asyncio.run(main())
输出:
创建会话
使用会话: {'id': 1, 'connected': True}
关闭会话
异常处理
aexit 的参数
__aexit__ 方法接收三个异常相关参数:
import asyncio
class ExceptionHandler:
"""异常处理示例"""
async def __aenter__(self):
print("进入上下文")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"退出上下文")
if exc_type:
print(f"异常类型: {exc_type.__name__}")
print(f"异常值: {exc_val}")
# 返回 True 会抑制异常
return False # 让异常继续传播
async def main():
try:
async with ExceptionHandler():
print("执行代码")
raise ValueError("测试异常")
except ValueError as e:
print(f"捕获异常: {e}")
asyncio.run(main())
抑制特定异常
返回 True 可以抑制异常:
import asyncio
class SuppressValueError:
"""抑制 ValueError 的上下文管理器"""
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is ValueError:
print(f"抑制 ValueError: {exc_val}")
return True # 抑制异常
return False
async def main():
async with SuppressValueError():
raise ValueError("这个异常会被抑制")
print("继续执行")
asyncio.run(main())
使用装饰器处理异常
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def error_handler():
"""捕获并记录异常"""
try:
yield
except ValueError as e:
print(f"捕获 ValueError: {e}")
except Exception as e:
print(f"捕获其他异常: {e}")
raise # 重新抛出
async def main():
async with error_handler():
raise ValueError("测试异常")
print("继续执行")
asyncio.run(main())
常见应用场景
数据库连接管理
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def database_transaction(pool):
"""数据库事务上下文管理器"""
# 从连接池获取连接
connection = await pool.acquire()
try:
# 开始事务
await connection.execute("BEGIN")
yield connection
# 提交事务
await connection.execute("COMMIT")
except Exception:
# 回滚事务
await connection.execute("ROLLBACK")
raise
finally:
# 释放连接
await pool.release(connection)
# 使用示例
async def main():
# pool = await create_pool()
async with database_transaction(pool) as conn:
await conn.execute("INSERT INTO users VALUES (1, 'Alice')")
await conn.execute("INSERT INTO orders VALUES (1, 100)")
# asyncio.run(main())
HTTP 会话管理
import asyncio
import aiohttp
from contextlib import asynccontextmanager
@asynccontextmanager
async def http_session():
"""HTTP 会话管理器"""
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
async def fetch_data(url):
async with http_session() as session:
async with session.get(url) as response:
return await response.text()
async def main():
html = await fetch_data("https://example.com")
print(f"获取了 {len(html)} 字节")
# asyncio.run(main())
异步锁上下文
import asyncio
class AsyncLockContext:
"""异步锁上下文管理器"""
def __init__(self, lock=None):
self.lock = lock or asyncio.Lock()
async def __aenter__(self):
await self.lock.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
return False
# 实际上 asyncio.Lock 本身就支持 async with
async def main():
lock = asyncio.Lock()
# 方式一:直接使用
async with lock:
print("持有锁")
await asyncio.sleep(1)
# 方式二:自定义包装
async with AsyncLockContext(lock):
print("再次持有锁")
asyncio.run(main())
超时控制
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def timeout_context(seconds):
"""超时上下文管理器"""
try:
async with asyncio.timeout(seconds):
yield
except TimeoutError:
print(f"操作超时({seconds}秒)")
raise
async def main():
try:
async with timeout_context(2):
print("开始长时间操作")
await asyncio.sleep(5) # 这会超时
except TimeoutError:
print("处理超时")
asyncio.run(main())
Python 3.11+ 可以直接使用 asyncio.timeout:
async def main():
async with asyncio.timeout(2):
await asyncio.sleep(5)
嵌套和组合
嵌套 async with
多个异步上下文管理器可以嵌套:
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def resource(name):
print(f"获取 {name}")
yield name
print(f"释放 {name}")
async def main():
async with resource("A") as a:
async with resource("B") as b:
async with resource("C") as c:
print(f"使用 {a}, {b}, {c}")
输出:
获取 A
获取 B
获取 C
使用 A, B, C
释放 C
释放 B
释放 A
并行进入多个上下文
使用 contextlib.AsyncExitStack 可以动态管理多个上下文:
import asyncio
from contextlib import asynccontextmanager, AsyncExitStack
@asynccontextmanager
async def resource(name):
print(f"获取 {name}")
yield name
print(f"释放 {name}")
async def main():
async with AsyncExitStack() as stack:
# 动态添加多个上下文
resources = [
await stack.enter_async_context(resource(f"资源{i}"))
for i in range(3)
]
print(f"使用所有资源: {resources}")
# 退出时按相反顺序释放
asyncio.run(main())
输出:
获取 资源0
获取 资源1
获取 资源2
使用所有资源: ['资源0', '资源1', '资源2']
释放 资源2
释放 资源1
释放 资源0
实践示例
示例 1:异步文件操作
import asyncio
import aiofiles
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_open(filepath, mode='r'):
"""异步文件操作上下文管理器"""
async with aiofiles.open(filepath, mode) as file:
print(f"打开文件: {filepath}")
yield file
print(f"关闭文件: {filepath}")
async def main():
async with async_open('example.txt', 'w') as f:
await f.write("Hello, Async IO!")
async with async_open('example.txt', 'r') as f:
content = await f.read()
print(f"内容: {content}")
# asyncio.run(main())
示例 2:异步临时目录
import asyncio
import tempfile
import shutil
import os
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_tempdir():
"""异步临时目录上下文管理器"""
# 在线程池中创建临时目录(tempfile 是同步的)
temp_dir = await asyncio.to_thread(tempfile.mkdtemp)
print(f"创建临时目录: {temp_dir}")
try:
yield temp_dir
finally:
# 异步删除目录
await asyncio.to_thread(shutil.rmtree, temp_dir)
print(f"删除临时目录: {temp_dir}")
async def main():
async with async_tempdir() as temp:
# 在临时目录中工作
filepath = os.path.join(temp, 'test.txt')
print(f"工作目录: {temp}")
await asyncio.sleep(1)
asyncio.run(main())
示例 3:连接池
import asyncio
from contextlib import asynccontextmanager
class ConnectionPool:
"""简单的连接池"""
def __init__(self, max_size=5):
self.max_size = max_size
self.semaphore = asyncio.Semaphore(max_size)
self.connections = []
@asynccontextmanager
async def acquire(self):
"""获取连接"""
await self.semaphore.acquire()
connection = await self._create_connection()
try:
yield connection
finally:
await self._close_connection(connection)
self.semaphore.release()
async def _create_connection(self):
"""创建连接"""
await asyncio.sleep(0.1)
return f"connection-{id(self)}"
async def _close_connection(self, conn):
"""关闭连接"""
await asyncio.sleep(0.05)
async def worker(pool, worker_id):
"""工作协程"""
async with pool.acquire() as conn:
print(f"Worker {worker_id} 使用 {conn}")
await asyncio.sleep(0.5)
print(f"Worker {worker_id} 完成")
async def main():
pool = ConnectionPool(max_size=3)
# 启动 10 个工作协程,但最多同时使用 3 个连接
tasks = [worker(pool, i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
示例 4:性能测量装饰器
import asyncio
from contextlib import asynccontextmanager
from functools import wraps
@asynccontextmanager
async def measure_time(operation_name):
"""测量异步操作耗时"""
start = asyncio.get_event_loop().time()
print(f"[{operation_name}] 开始")
try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"[{operation_name}] 完成,耗时: {elapsed:.3f}秒")
def timed(operation_name):
"""装饰器:自动测量异步函数耗时"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with measure_time(operation_name):
return await func(*args, **kwargs)
return wrapper
return decorator
@timed("数据获取")
async def fetch_data():
await asyncio.sleep(1)
return "数据"
async def main():
result = await fetch_data()
print(f"结果: {result}")
asyncio.run(main())
小结
异步上下文管理器的核心概念:
| 概念 | 说明 |
|---|---|
__aenter__ | 进入上下文时调用的异步方法 |
__aexit__ | 退出上下文时调用的异步方法 |
async with | 使用异步上下文管理器的语法 |
@asynccontextmanager | 简化实现的装饰器 |
最佳实践:
- 使用
@asynccontextmanager装饰器简化代码 - 在
__aexit__中正确处理异常 - 使用
try/finally确保资源释放 - 考虑使用
AsyncExitStack动态管理多个上下文 - 为复杂资源(连接池、事务)实现专门的上下文管理器
异步上下文管理器是管理异步资源的优雅方式,让资源获取和释放的代码更加清晰和可靠。