asyncio 速查表
本文档提供 asyncio 常用 API 的快速参考。
协程与任务
定义协程
import asyncio
# 定义协程函数
async def my_coro():
await asyncio.sleep(1)
return "结果"
# 运行协程
asyncio.run(my_coro())
创建任务
# 方式一:create_task(Python 3.7+)
task = asyncio.create_task(my_coro())
# 方式二:TaskGroup(Python 3.11+,推荐)
async with asyncio.TaskGroup() as tg:
task = tg.create_task(my_coro())
等待任务
# 等待单个任务
result = await task
# 并发等待多个任务
results = await asyncio.gather(task1, task2, task3)
# 按完成顺序处理
for coro in asyncio.as_completed([task1, task2, task3]):
result = await coro
# 等待特定条件
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.ALL_COMPLETED # 或 FIRST_COMPLETED, FIRST_EXCEPTION
)
超时控制
# wait_for(Python 3.7+)
try:
result = await asyncio.wait_for(my_coro(), timeout=5.0)
except asyncio.TimeoutError:
print("超时")
# timeout 上下文管理器(Python 3.11+)
try:
async with asyncio.timeout(5.0):
result = await my_coro()
except TimeoutError:
print("超时")
取消任务
task.cancel() # 请求取消
try:
await task
except asyncio.CancelledError:
print("任务被取消")
任务状态
| 方法/属性 | 说明 |
|---|---|
task.done() | 是否完成 |
task.cancelled() | 是否被取消 |
task.result() | 获取结果(已完成后) |
task.exception() | 获取异常(已完成后) |
task.get_name() | 获取任务名 |
task.set_name(name) | 设置任务名 |
异步操作
睡眠
await asyncio.sleep(1) # 睡眠 1 秒
await asyncio.sleep(0) # 让出控制权
在线程中执行
# Python 3.9+
result = await asyncio.to_thread(blocking_function, arg1, arg2)
# 通用方式
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)
同步原语
Lock
lock = asyncio.Lock()
async with lock:
# 临界区
pass
# 或手动管理
await lock.acquire()
try:
pass
finally:
lock.release()
Event
event = asyncio.Event()
# 等待事件
await event.wait()
# 设置/清除事件
event.set()
event.clear()
# 检查状态
event.is_set()
Condition
condition = asyncio.Condition()
async with condition:
await condition.wait() # 等待通知
condition.notify() # 通知一个
condition.notify_all() # 通知所有
Semaphore
semaphore = asyncio.Semaphore(3) # 最多 3 个并发
async with semaphore:
# 限制并发区域
pass
Barrier
# Python 3.11+
barrier = asyncio.Barrier(3) # 需要 3 个协程
await barrier.wait() # 等待所有协程到达
队列
基本用法
queue = asyncio.Queue(maxsize=10)
# 放入
await queue.put(item)
# 取出
item = await queue.get()
# 非阻塞
queue.put_nowait(item)
item = queue.get_nowait()
# 标记完成
queue.task_done()
# 等待所有任务完成
await queue.join()
队列类型
| 类型 | 说明 |
|---|---|
Queue | 先进先出(FIFO) |
PriorityQueue | 优先级队列 |
LifoQueue | 后进先出(LIFO) |
队列属性
| 方法/属性 | 说明 |
|---|---|
qsize() | 当前大小 |
empty() | 是否为空 |
full() | 是否已满 |
maxsize | 最大容量 |
异步上下文管理器
class AsyncContextManager:
async def __aenter__(self):
print("进入")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出")
return False
async with AsyncContextManager() as manager:
pass
异步迭代器
class AsyncIterator:
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(0.1)
raise StopAsyncIteration
async for item in AsyncIterator():
pass
异步生成器
async def async_generator():
for i in range(3):
await asyncio.sleep(0.1)
yield i
async for value in async_generator():
print(value)
# 收集所有值
values = [v async for v in async_generator()]
网络操作
TCP 客户端
reader, writer = await asyncio.open_connection('host', 8888)
writer.write(b'Hello')
await writer.drain()
data = await reader.read(100)
writer.close()
await writer.wait_closed()
TCP 服务器
async def handle_client(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
server = await asyncio.start_server(handle_client, 'localhost', 8888)
async with server:
await server.serve_forever()
UDP
transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
asyncio.DatagramProtocol,
remote_addr=('host', 8888)
)
transport.sendto(b'Hello')
子进程
运行命令
# 简单方式
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
# 使用 shell
proc = await asyncio.create_subprocess_shell(
'ls -la',
stdout=asyncio.subprocess.PIPE
)
与子进程交互
proc = await asyncio.create_subprocess_exec(
'cat',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)
proc.stdin.write(b'Hello\n')
await proc.stdin.drain()
proc.stdin.close()
output = await proc.stdout.read()
信号处理
import signal
loop = asyncio.get_running_loop()
def handler():
print("收到信号")
loop.stop()
loop.add_signal_handler(signal.SIGINT, handler)
事件循环
获取事件循环
loop = asyncio.get_running_loop() # 获取正在运行的循环
loop = asyncio.get_event_loop() # 获取或创建循环(不推荐在新代码中使用)
低层 API
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(coro)
loop.run_forever()
loop.stop()
loop.close()
常用函数速查
| 函数 | 说明 |
|---|---|
asyncio.run(coro) | 运行协程(主入口) |
asyncio.create_task(coro) | 创建任务 |
asyncio.gather(*aws) | 并发执行并收集结果 |
asyncio.wait(aws) | 等待任务完成 |
asyncio.sleep(delay) | 异步睡眠 |
asyncio.wait_for(aw, timeout) | 带超时等待 |
asyncio.shield(aw) | 保护不被取消 |
asyncio.timeout(delay) | 超时上下文管理器 |
asyncio.to_thread(func) | 在线程中执行 |
asyncio.all_tasks() | 获取所有任务 |
asyncio.current_task() | 获取当前任务 |
异常类型
| 异常 | 说明 |
|---|---|
asyncio.CancelledError | 任务被取消 |
asyncio.TimeoutError | 操作超时 |
asyncio.InvalidStateError | 任务状态无效 |
类型检查
| 函数 | 说明 |
|---|---|
asyncio.iscoroutine(obj) | 是否是协程对象 |
asyncio.iscoroutinefunction(func) | 是否是协程函数 |
asyncio.isfuture(obj) | 是否是 Future |
asyncio.ensure_future(coro) | 确保是 Task/Future |
Python 版本特性
| 版本 | 新特性 |
|---|---|
| 3.7 | asyncio.run(), asyncio.create_task() |
| 3.8 | 任务命名 |
| 3.9 | asyncio.to_thread() |
| 3.10 | asyncio.TaskGroup 预览 |
| 3.11 | asyncio.TaskGroup, asyncio.timeout() |
| 3.12 | eager_task_factory, create_eager_task_factory() |
| 3.13 | as_completed() 支持异步迭代 |
| 3.14 | create_task() 添加 eager_start 参数 |
重要概念速查
shield - 保护任务不被取消
import asyncio
async def main():
task = asyncio.create_task(important_work())
try:
# 保护任务不被外部取消
result = await asyncio.shield(task)
except asyncio.CancelledError:
# 等待被取消,但任务仍在执行
result = await task
eager_task_factory - 即时任务执行(Python 3.12+)
import asyncio
async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)
# 任务立即执行,无需调度
task = asyncio.create_task(quick_coro())
as_completed 异步迭代(Python 3.13+)
import asyncio
async def main():
tasks = [asyncio.create_task(work(i)) for i in range(5)]
# Python 3.13+:异步迭代,返回原始任务
async for task in asyncio.as_completed(tasks):
result = await task
# 可以识别是哪个任务
推荐异步库
| 用途 | 库 |
|---|---|
| HTTP 客户端 | aiohttp, httpx |
| Web 框架 | FastAPI, Starlette, aiohttp |
| 数据库 | asyncpg, aiomysql, motor |
| Redis | aioredis |
| 任务队列 | arq, dramatiq |
| 文件操作 | aiofiles |