跳到主要内容

asyncio 速查表

本文档提供 asyncio 常用 API 的快速参考。

协程与任务

定义协程

import asyncio

# 定义协程函数
async def my_coro():
await asyncio.sleep(1)
return "结果"

# 运行协程
asyncio.run(my_coro())

创建任务

# 方式一:create_task(Python 3.7+)
task = asyncio.create_task(my_coro())

# 方式二:TaskGroup(Python 3.11+,推荐)
async with asyncio.TaskGroup() as tg:
task = tg.create_task(my_coro())

等待任务

# 等待单个任务
result = await task

# 并发等待多个任务
results = await asyncio.gather(task1, task2, task3)

# 按完成顺序处理
for coro in asyncio.as_completed([task1, task2, task3]):
result = await coro

# 等待特定条件
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.ALL_COMPLETED # 或 FIRST_COMPLETED, FIRST_EXCEPTION
)

超时控制

# wait_for(Python 3.7+)
try:
result = await asyncio.wait_for(my_coro(), timeout=5.0)
except asyncio.TimeoutError:
print("超时")

# timeout 上下文管理器(Python 3.11+)
try:
async with asyncio.timeout(5.0):
result = await my_coro()
except TimeoutError:
print("超时")

取消任务

task.cancel()  # 请求取消

try:
await task
except asyncio.CancelledError:
print("任务被取消")

任务状态

方法/属性说明
task.done()是否完成
task.cancelled()是否被取消
task.result()获取结果(已完成后)
task.exception()获取异常(已完成后)
task.get_name()获取任务名
task.set_name(name)设置任务名

异步操作

睡眠

await asyncio.sleep(1)  # 睡眠 1 秒
await asyncio.sleep(0) # 让出控制权

在线程中执行

# Python 3.9+
result = await asyncio.to_thread(blocking_function, arg1, arg2)

# 通用方式
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, blocking_function)

同步原语

Lock

lock = asyncio.Lock()

async with lock:
# 临界区
pass

# 或手动管理
await lock.acquire()
try:
pass
finally:
lock.release()

Event

event = asyncio.Event()

# 等待事件
await event.wait()

# 设置/清除事件
event.set()
event.clear()

# 检查状态
event.is_set()

Condition

condition = asyncio.Condition()

async with condition:
await condition.wait() # 等待通知
condition.notify() # 通知一个
condition.notify_all() # 通知所有

Semaphore

semaphore = asyncio.Semaphore(3)  # 最多 3 个并发

async with semaphore:
# 限制并发区域
pass

Barrier

# Python 3.11+
barrier = asyncio.Barrier(3) # 需要 3 个协程

await barrier.wait() # 等待所有协程到达

队列

基本用法

queue = asyncio.Queue(maxsize=10)

# 放入
await queue.put(item)

# 取出
item = await queue.get()

# 非阻塞
queue.put_nowait(item)
item = queue.get_nowait()

# 标记完成
queue.task_done()

# 等待所有任务完成
await queue.join()

队列类型

类型说明
Queue先进先出(FIFO)
PriorityQueue优先级队列
LifoQueue后进先出(LIFO)

队列属性

方法/属性说明
qsize()当前大小
empty()是否为空
full()是否已满
maxsize最大容量

异步上下文管理器

class AsyncContextManager:
async def __aenter__(self):
print("进入")
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("退出")
return False

async with AsyncContextManager() as manager:
pass

异步迭代器

class AsyncIterator:
def __aiter__(self):
return self

async def __anext__(self):
await asyncio.sleep(0.1)
raise StopAsyncIteration

async for item in AsyncIterator():
pass

异步生成器

async def async_generator():
for i in range(3):
await asyncio.sleep(0.1)
yield i

async for value in async_generator():
print(value)

# 收集所有值
values = [v async for v in async_generator()]

网络操作

TCP 客户端

reader, writer = await asyncio.open_connection('host', 8888)

writer.write(b'Hello')
await writer.drain()

data = await reader.read(100)

writer.close()
await writer.wait_closed()

TCP 服务器

async def handle_client(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()

server = await asyncio.start_server(handle_client, 'localhost', 8888)
async with server:
await server.serve_forever()

UDP

transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
asyncio.DatagramProtocol,
remote_addr=('host', 8888)
)

transport.sendto(b'Hello')

子进程

运行命令

# 简单方式
proc = await asyncio.create_subprocess_exec(
'ls', '-la',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()

# 使用 shell
proc = await asyncio.create_subprocess_shell(
'ls -la',
stdout=asyncio.subprocess.PIPE
)

与子进程交互

proc = await asyncio.create_subprocess_exec(
'cat',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE
)

proc.stdin.write(b'Hello\n')
await proc.stdin.drain()
proc.stdin.close()

output = await proc.stdout.read()

信号处理

import signal

loop = asyncio.get_running_loop()

def handler():
print("收到信号")
loop.stop()

loop.add_signal_handler(signal.SIGINT, handler)

事件循环

获取事件循环

loop = asyncio.get_running_loop()  # 获取正在运行的循环
loop = asyncio.get_event_loop() # 获取或创建循环(不推荐在新代码中使用)

低层 API

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

loop.run_until_complete(coro)
loop.run_forever()
loop.stop()
loop.close()

常用函数速查

函数说明
asyncio.run(coro)运行协程(主入口)
asyncio.create_task(coro)创建任务
asyncio.gather(*aws)并发执行并收集结果
asyncio.wait(aws)等待任务完成
asyncio.sleep(delay)异步睡眠
asyncio.wait_for(aw, timeout)带超时等待
asyncio.shield(aw)保护不被取消
asyncio.timeout(delay)超时上下文管理器
asyncio.to_thread(func)在线程中执行
asyncio.all_tasks()获取所有任务
asyncio.current_task()获取当前任务

异常类型

异常说明
asyncio.CancelledError任务被取消
asyncio.TimeoutError操作超时
asyncio.InvalidStateError任务状态无效

类型检查

函数说明
asyncio.iscoroutine(obj)是否是协程对象
asyncio.iscoroutinefunction(func)是否是协程函数
asyncio.isfuture(obj)是否是 Future
asyncio.ensure_future(coro)确保是 Task/Future

Python 版本特性

版本新特性
3.7asyncio.run(), asyncio.create_task()
3.8任务命名
3.9asyncio.to_thread()
3.10asyncio.TaskGroup 预览
3.11asyncio.TaskGroup, asyncio.timeout()
3.12eager_task_factory, create_eager_task_factory()
3.13as_completed() 支持异步迭代
3.14create_task() 添加 eager_start 参数

重要概念速查

shield - 保护任务不被取消

import asyncio

async def main():
task = asyncio.create_task(important_work())

try:
# 保护任务不被外部取消
result = await asyncio.shield(task)
except asyncio.CancelledError:
# 等待被取消,但任务仍在执行
result = await task

eager_task_factory - 即时任务执行(Python 3.12+)

import asyncio

async def main():
loop = asyncio.get_running_loop()
loop.set_task_factory(asyncio.eager_task_factory)

# 任务立即执行,无需调度
task = asyncio.create_task(quick_coro())

as_completed 异步迭代(Python 3.13+)

import asyncio

async def main():
tasks = [asyncio.create_task(work(i)) for i in range(5)]

# Python 3.13+:异步迭代,返回原始任务
async for task in asyncio.as_completed(tasks):
result = await task
# 可以识别是哪个任务

推荐异步库

用途
HTTP 客户端aiohttp, httpx
Web 框架FastAPI, Starlette, aiohttp
数据库asyncpg, aiomysql, motor
Redisaioredis
任务队列arq, dramatiq
文件操作aiofiles