跳到主要内容

异步上下文管理器

异步上下文管理器允许我们在进入和退出代码块时执行异步操作。这对于管理需要异步初始化或清理的资源非常有用,例如数据库连接、网络会话、文件句柄等。

同步上下文管理器回顾

在讨论异步上下文管理器之前,先回顾同步上下文管理器的工作原理。

同步上下文管理器

同步上下文管理器实现 __enter____exit__ 方法:

class Timer:
"""计时器上下文管理器"""

def __enter__(self):
import time
self.start = time.time()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
import time
elapsed = time.time() - self.start
print(f"耗时: {elapsed:.2f} 秒")
return False # 不抑制异常

# 使用 with 语句
with Timer():
# 执行一些操作
sum(range(1000000))

同步上下文管理器的限制

__enter____exit__ 方法不能是异步的,无法在其中执行 await 操作:

class AsyncResource:
def __enter__(self):
# 不能在这里使用 await!
await self.connect() # SyntaxError
return self

对于需要异步初始化或清理的资源,就需要异步上下文管理器。

异步上下文管理器基础

基本语法

异步上下文管理器实现 __aenter____aexit__ 方法:

import asyncio

class AsyncTimer:
"""异步计时器"""

async def __aenter__(self):
self.start = asyncio.get_event_loop().time()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
elapsed = asyncio.get_event_loop().time() - self.start
print(f"耗时: {elapsed:.2f} 秒")
return False

async def main():
async with AsyncTimer():
await asyncio.sleep(1)
print("执行中...")

asyncio.run(main())

输出:

执行中...
耗时: 1.00 秒

async with 语法

使用 async with 语句进入异步上下文管理器:

async def main():
# async with 自动调用 __aenter__ 和 __aexit__
async with AsyncTimer() as timer:
# timer 是 __aenter__ 的返回值
await asyncio.sleep(1)
print(f"开始时间: {timer.start}")

关键区别

特性同步上下文管理器异步上下文管理器
进入方法__enter____aenter__
退出方法__exit____aexit__
方法类型同步方法异步方法(async def
使用语法with obj:async with obj:

实现异步上下文管理器

手动实现类

对于复杂的资源管理,手动实现 __aenter____aexit__

import asyncio

class AsyncDatabaseConnection:
"""异步数据库连接"""

def __init__(self, host, port):
self.host = host
self.port = port
self._connection = None

async def __aenter__(self):
# 异步建立连接
print(f"连接到 {self.host}:{self.port}")
await asyncio.sleep(0.5) # 模拟连接过程
self._connection = "connected"
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭连接
print("关闭连接")
await asyncio.sleep(0.1) # 模拟关闭过程
self._connection = None

# 处理异常
if exc_type:
print(f"发生异常: {exc_val}")

return False # 不抑制异常

async def query(self, sql):
"""执行查询"""
if not self._connection:
raise RuntimeError("未连接")

await asyncio.sleep(0.1) # 模拟查询
return f"查询结果: {sql}"

async def main():
async with AsyncDatabaseConnection("localhost", 5432) as db:
result = await db.query("SELECT * FROM users")
print(result)

asyncio.run(main())

输出:

连接到 localhost:5432
查询结果: SELECT * FROM users
关闭连接

使用 @asynccontextmanager 装饰器

contextlib.asynccontextmanager 提供了更简洁的实现方式(Python 3.7+):

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(name):
"""使用装饰器实现的异步计时器"""
start = asyncio.get_event_loop().time()
print(f"{name} 开始")

try:
yield # yield 之前是 __aenter__ 的逻辑
# yield 之后是 __aexit__ 的逻辑
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"{name} 结束,耗时: {elapsed:.2f} 秒")

async def main():
async with async_timer("任务"):
await asyncio.sleep(1)

asyncio.run(main())

输出:

任务 开始
任务 结束,耗时: 1.00 秒

yield 返回值

yield 语句可以返回值,作为 as 后面的变量:

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_session():
"""创建数据库会话"""
session = {"id": 1, "connected": True}
print("创建会话")

try:
yield session # 返回给 async with ... as session
finally:
session["connected"] = False
print("关闭会话")

async def main():
async with get_session() as session:
print(f"使用会话: {session}")

asyncio.run(main())

输出:

创建会话
使用会话: {'id': 1, 'connected': True}
关闭会话

异常处理

aexit 的参数

__aexit__ 方法接收三个异常相关参数:

import asyncio

class ExceptionHandler:
"""异常处理示例"""

async def __aenter__(self):
print("进入上下文")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"退出上下文")

if exc_type:
print(f"异常类型: {exc_type.__name__}")
print(f"异常值: {exc_val}")
# 返回 True 会抑制异常
return False # 让异常继续传播

async def main():
try:
async with ExceptionHandler():
print("执行代码")
raise ValueError("测试异常")
except ValueError as e:
print(f"捕获异常: {e}")

asyncio.run(main())

抑制特定异常

返回 True 可以抑制异常:

import asyncio

class SuppressValueError:
"""抑制 ValueError 的上下文管理器"""

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type is ValueError:
print(f"抑制 ValueError: {exc_val}")
return True # 抑制异常
return False

async def main():
async with SuppressValueError():
raise ValueError("这个异常会被抑制")

print("继续执行")

asyncio.run(main())

使用装饰器处理异常

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def error_handler():
"""捕获并记录异常"""
try:
yield
except ValueError as e:
print(f"捕获 ValueError: {e}")
except Exception as e:
print(f"捕获其他异常: {e}")
raise # 重新抛出

async def main():
async with error_handler():
raise ValueError("测试异常")

print("继续执行")

asyncio.run(main())

常见应用场景

数据库连接管理

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def database_transaction(pool):
"""数据库事务上下文管理器"""
# 从连接池获取连接
connection = await pool.acquire()

try:
# 开始事务
await connection.execute("BEGIN")
yield connection
# 提交事务
await connection.execute("COMMIT")
except Exception:
# 回滚事务
await connection.execute("ROLLBACK")
raise
finally:
# 释放连接
await pool.release(connection)

# 使用示例
async def main():
# pool = await create_pool()
async with database_transaction(pool) as conn:
await conn.execute("INSERT INTO users VALUES (1, 'Alice')")
await conn.execute("INSERT INTO orders VALUES (1, 100)")

# asyncio.run(main())

HTTP 会话管理

import asyncio
import aiohttp
from contextlib import asynccontextmanager

@asynccontextmanager
async def http_session():
"""HTTP 会话管理器"""
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()

async def fetch_data(url):
async with http_session() as session:
async with session.get(url) as response:
return await response.text()

async def main():
html = await fetch_data("https://example.com")
print(f"获取了 {len(html)} 字节")

# asyncio.run(main())

异步锁上下文

import asyncio

class AsyncLockContext:
"""异步锁上下文管理器"""

def __init__(self, lock=None):
self.lock = lock or asyncio.Lock()

async def __aenter__(self):
await self.lock.acquire()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
return False

# 实际上 asyncio.Lock 本身就支持 async with
async def main():
lock = asyncio.Lock()

# 方式一:直接使用
async with lock:
print("持有锁")
await asyncio.sleep(1)

# 方式二:自定义包装
async with AsyncLockContext(lock):
print("再次持有锁")

asyncio.run(main())

超时控制

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def timeout_context(seconds):
"""超时上下文管理器"""
try:
async with asyncio.timeout(seconds):
yield
except TimeoutError:
print(f"操作超时({seconds}秒)")
raise

async def main():
try:
async with timeout_context(2):
print("开始长时间操作")
await asyncio.sleep(5) # 这会超时
except TimeoutError:
print("处理超时")

asyncio.run(main())

Python 3.11+ 可以直接使用 asyncio.timeout

async def main():
async with asyncio.timeout(2):
await asyncio.sleep(5)

嵌套和组合

嵌套 async with

多个异步上下文管理器可以嵌套:

import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def resource(name):
print(f"获取 {name}")
yield name
print(f"释放 {name}")

async def main():
async with resource("A") as a:
async with resource("B") as b:
async with resource("C") as c:
print(f"使用 {a}, {b}, {c}")

输出:

获取 A
获取 B
获取 C
使用 A, B, C
释放 C
释放 B
释放 A

并行进入多个上下文

使用 contextlib.AsyncExitStack 可以动态管理多个上下文:

import asyncio
from contextlib import asynccontextmanager, AsyncExitStack

@asynccontextmanager
async def resource(name):
print(f"获取 {name}")
yield name
print(f"释放 {name}")

async def main():
async with AsyncExitStack() as stack:
# 动态添加多个上下文
resources = [
await stack.enter_async_context(resource(f"资源{i}"))
for i in range(3)
]

print(f"使用所有资源: {resources}")

# 退出时按相反顺序释放

asyncio.run(main())

输出:

获取 资源0
获取 资源1
获取 资源2
使用所有资源: ['资源0', '资源1', '资源2']
释放 资源2
释放 资源1
释放 资源0

实践示例

示例 1:异步文件操作

import asyncio
import aiofiles
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_open(filepath, mode='r'):
"""异步文件操作上下文管理器"""
async with aiofiles.open(filepath, mode) as file:
print(f"打开文件: {filepath}")
yield file
print(f"关闭文件: {filepath}")

async def main():
async with async_open('example.txt', 'w') as f:
await f.write("Hello, Async IO!")

async with async_open('example.txt', 'r') as f:
content = await f.read()
print(f"内容: {content}")

# asyncio.run(main())

示例 2:异步临时目录

import asyncio
import tempfile
import shutil
import os
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_tempdir():
"""异步临时目录上下文管理器"""
# 在线程池中创建临时目录(tempfile 是同步的)
temp_dir = await asyncio.to_thread(tempfile.mkdtemp)
print(f"创建临时目录: {temp_dir}")

try:
yield temp_dir
finally:
# 异步删除目录
await asyncio.to_thread(shutil.rmtree, temp_dir)
print(f"删除临时目录: {temp_dir}")

async def main():
async with async_tempdir() as temp:
# 在临时目录中工作
filepath = os.path.join(temp, 'test.txt')
print(f"工作目录: {temp}")
await asyncio.sleep(1)

asyncio.run(main())

示例 3:连接池

import asyncio
from contextlib import asynccontextmanager

class ConnectionPool:
"""简单的连接池"""

def __init__(self, max_size=5):
self.max_size = max_size
self.semaphore = asyncio.Semaphore(max_size)
self.connections = []

@asynccontextmanager
async def acquire(self):
"""获取连接"""
await self.semaphore.acquire()

connection = await self._create_connection()
try:
yield connection
finally:
await self._close_connection(connection)
self.semaphore.release()

async def _create_connection(self):
"""创建连接"""
await asyncio.sleep(0.1)
return f"connection-{id(self)}"

async def _close_connection(self, conn):
"""关闭连接"""
await asyncio.sleep(0.05)

async def worker(pool, worker_id):
"""工作协程"""
async with pool.acquire() as conn:
print(f"Worker {worker_id} 使用 {conn}")
await asyncio.sleep(0.5)
print(f"Worker {worker_id} 完成")

async def main():
pool = ConnectionPool(max_size=3)

# 启动 10 个工作协程,但最多同时使用 3 个连接
tasks = [worker(pool, i) for i in range(10)]
await asyncio.gather(*tasks)

asyncio.run(main())

示例 4:性能测量装饰器

import asyncio
from contextlib import asynccontextmanager
from functools import wraps

@asynccontextmanager
async def measure_time(operation_name):
"""测量异步操作耗时"""
start = asyncio.get_event_loop().time()
print(f"[{operation_name}] 开始")

try:
yield
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"[{operation_name}] 完成,耗时: {elapsed:.3f}秒")

def timed(operation_name):
"""装饰器:自动测量异步函数耗时"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with measure_time(operation_name):
return await func(*args, **kwargs)
return wrapper
return decorator

@timed("数据获取")
async def fetch_data():
await asyncio.sleep(1)
return "数据"

async def main():
result = await fetch_data()
print(f"结果: {result}")

asyncio.run(main())

小结

异步上下文管理器的核心概念:

概念说明
__aenter__进入上下文时调用的异步方法
__aexit__退出上下文时调用的异步方法
async with使用异步上下文管理器的语法
@asynccontextmanager简化实现的装饰器

最佳实践:

  1. 使用 @asynccontextmanager 装饰器简化代码
  2. __aexit__ 中正确处理异常
  3. 使用 try/finally 确保资源释放
  4. 考虑使用 AsyncExitStack 动态管理多个上下文
  5. 为复杂资源(连接池、事务)实现专门的上下文管理器

异步上下文管理器是管理异步资源的优雅方式,让资源获取和释放的代码更加清晰和可靠。