跳到主要内容

Python 异步编程

asyncio 是 Python 用于编写并发代码的库,使用 async/await 语法。它是构建高性能网络服务器、数据库连接库、分布式任务队列等框架的基础。

官方文档参考

本文档内容基于 Python 官方 asyncio 文档概念概述

什么是异步编程?

异步编程是一种编程范式,允许程序在等待某些操作(如I/O操作、网络请求)完成时继续执行其他任务。与传统的同步编程相比,异步编程可以显著提高程序的效率和响应性。

为什么需要异步编程?

在同步编程中,当程序执行一个耗时的I/O操作时,整个程序会被阻塞,直到操作完成。这意味着如果有大量并发请求,程序需要创建大量线程,消耗大量内存资源。

异步编程通过以下方式解决这一问题:

  • 单线程并发:使用事件循环在单个线程内管理多个并发任务
  • 非阻塞I/O:当一个任务等待I/O时,事件循环可以执行其他任务
  • 资源高效:无需为每个并发任务创建单独的操作系统线程

同步 vs 异步

# 同步执行 - 每个任务等待完成后再执行下一个
import time

def fetch_data_sync():
time.sleep(1)
return "数据1"

def main_sync():
result1 = fetch_data_sync()
result2 = fetch_data_sync()
print(result1, result2)

main_sync() # 约 2 秒

# 异步执行 - 任务可以并发运行
import asyncio

async def fetch_data_async():
await asyncio.sleep(1)
return "数据1"

async def main_async():
# 创建任务并发执行
task1 = asyncio.create_task(fetch_data_async())
task2 = asyncio.create_task(fetch_data_async())

result1 = await task1
result2 = await task2
print(result1, result2)

asyncio.run(main_async()) # 约 1 秒

核心概念

理解 asyncio 的核心概念对于编写正确的异步代码至关重要。根据 Python 官方文档,asyncio 主要包含以下核心概念:

事件循环(Event Loop)

事件循环是 asyncio 的核心,负责调度协程的执行。它就像一个交响乐团的指挥家,管理着所有待执行的任务。

事件循环的工作原理可以理解为以下几个步骤的循环往复:

  1. 任务注册:当创建一个协程或任务时,它被添加到事件循环的任务队列中等待执行
  2. 任务调度:事件循环从任务队列中取出待执行的任务
  3. 执行任务:执行任务的代码,直到遇到 await 语句需要等待某个操作完成
  4. 等待操作:当任务需要等待 I/O 操作(如网络请求、文件读写)时,事件循环会挂起该任务
  5. 切换任务:在等待期间,事件循环可以切换到其他已就绪的任务继续执行
  6. 任务完成:当等待的操作完成后,任务被重新放回任务队列,等待下次执行
任务队列


┌─────────────────────────────────────┐
│ 事件循环主循环 │
│ │
│ 1. 从队列取任务 │
│ 2. 执行任务直到 await │
│ 3. 如果需要等待 I/O, │
│ 将任务挂起,切换到其他任务 │
│ 4. I/O 就绪后,重新调度任务 │
└─────────────────────────────────────┘


任务完成,返回结果

事件循环的主要职责:

  1. 管理待执行任务:维护一个任务队列
  2. 调度执行:决定何时执行哪个任务
  3. 处理I/O:在等待I/O时切换到其他任务
  4. 回调处理:执行任务完成后的回调函数
import asyncio

# 获取当前正在运行的事件循环
loop = asyncio.get_running_loop()
print(f"当前事件循环: {loop}")

# 创建新的事件循环(不常用,推荐使用 asyncio.run)
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)

async/await 语法

async def:定义协程函数(Coroutine Function)

import asyncio

# async def 定义协程函数
async def greet():
print("Hello!")
await asyncio.sleep(1) # 暂停协程
print("World!")

# 运行协程

```python
import asyncio

# async def 定义协程函数
async def greet():
print("Hello!")
await asyncio.sleep(1) # 暂停协程
print("World!")

# 运行协程
asyncio.run(greet())

协程(Coroutine)

协程是可以暂停和恢复的函数。这是 asyncio 最核心的概念之一。

根据 Python 官方文档,协程具有以下特性:

  1. 可暂停:协程在执行过程中可以暂停,将控制权交还给事件循环
  2. 可恢复:暂停的协程可以在之前暂停的地方继续执行
  3. 可等待:协程是 awaitable 对象,可以在其他协程中等待
import asyncio

async def say_after(delay, what):
await asyncio.sleep(delay)
return what

# 创建协程对象 - 注意:只是创建,不会自动执行
coro = say_after(1, "Hello")
print(type(coro)) # <class 'coroutine'>

# 运行协程
result = asyncio.run(coro)
print(result) # Hello

重要:协程 vs 协程函数

根据官方文档,"协程"一词可以指两个相关但不同的概念:

  • 协程函数:使用 async def 定义的函数
  • 协程对象:调用协程函数返回的对象
# 协程函数
async def my_async_function():
pass

# 协程对象(调用协程函数返回)
coro = my_async_function()
常见错误:协程未被等待

调用协程函数只会创建一个协程对象,不会自动执行。如果不等待协程对象,Python 会发出 RuntimeWarning:

async def test():
print("never scheduled")

async def main():
test() # 警告:协程未被等待!

asyncio.run(main())
# 输出:RuntimeWarning: coroutine 'test' was never awaited

Awaitables(可等待对象)

Python 中有三种主要类型的可等待对象:

  1. 协程(Coroutines):async 函数返回的对象
  2. 任务(Tasks):被调度执行的协程
  3. Futures:代表尚未完成的计算
import asyncio

async def coro():
return "hello"

# 协程是可等待对象
async def main():
# 方式1:直接 await 协程
result = await coro()

# 方式2:包装成任务
task = asyncio.create_task(coro())
result = await task

asyncio.run(main())

任务(Task)

任务是协程的封装,由事件循环调度执行。创建任务后,任务会自动被调度。

import asyncio

async def my_coroutine():
await asyncio.sleep(1)
return "完成"

async def main():
# 创建任务 - 任务会立即被调度执行
task = asyncio.create_task(my_coroutine())

# 在等待任务完成的同时可以执行其他操作
print("任务已创建,等待完成...")

# 等待任务完成
result = await task
print(f"结果: {result}")

asyncio.run(main())

任务 vs 协程的区别

import asyncio

async def coro():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "协程结果"

async def main():
# 仅创建协程对象 - 不会执行
c = coro()
print("协程对象已创建,但未执行")

# 创建任务 - 会立即调度执行
task = asyncio.create_task(coro())
print("任务已创建并调度")

await task

asyncio.run(main())

await 关键字详解

await 是 Python 的关键字,用于等待可等待对象完成。根据 官方文档,await 的行为取决于所等待的对象类型:

  • 等待协程:暂停当前协程,等待被等待的协程完成
  • 等待任务:将控制权交还给事件循环,允许其他任务执行
  • 等待 Future:类似等待任务
import asyncio

async def coro_a():
print("coro_a 开始")
await asyncio.sleep(1)
print("coro_a 结束")
return "A"

async def coro_b():
print("coro_b 开始")
await asyncio.sleep(0.5)
print("coro_b 结束")
return "B"

async def main():
# await 等待协程
result = await coro_a()
print(f"得到结果: {result}")

# 使用 create_task 后再 await - 控制权会交还给事件循环
task = asyncio.create_task(coro_b())
print("任务已创建,我可以做其他事情")

result = await task
print(f"任务结果: {result}")

asyncio.run(main())

事件循环(Event Loop)

事件循环是 asyncio 的核心,负责调度协程的执行:

import asyncio

# 方式1:推荐方式(Python 3.7+)
async def main():
print("Hello")

asyncio.run(main())

# 方式2:手动管理事件循环
# 获取当前事件循环
loop = asyncio.get_event_loop()

# 创建任务
async def main():
print("Hello")

loop.run_until_complete(main())

# 方式3:创建新的事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
推荐使用 asyncio.run()

asyncio.run() 是运行异步程序的推荐方式,它会自动创建事件循环并在完成后清理资源。

基础用法

asyncio.run()

启动主协程(Python 3.7+):

import asyncio

async def main():
print("Hello")

asyncio.run(main()) # 推荐方式

await 表达式

await 用于等待协程完成:

import asyncio

async def fetch():
await asyncio.sleep(1)
return "数据"

async def main():
result = await fetch() # 等待 fetch 完成
print(result)

asyncio.run(main())

asyncio.create_task()

创建并发任务:

import asyncio

async def task1():
await asyncio.sleep(1)
return "任务1"

async def task2():
await asyncio.sleep(2)
return "任务2"

async def main():
# 创建任务(立即调度执行)
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())

# 等待所有任务完成
result1 = await t1
result2 = await t2
print(result1, result2)

asyncio.run(main()) # 约 2 秒完成

并发运行

asyncio.gather()

并发运行多个协程:

import asyncio

async def fetch_data(delay, name):
await asyncio.sleep(delay)
return f"{name} 完成"

async def main():
results = await asyncio.gather(
fetch_data(1, "任务1"),
fetch_data(2, "任务2"),
fetch_data(3, "任务3")
)
for result in results:
print(result)

asyncio.run(main())

asyncio.wait()

等待一组任务完成:

import asyncio

async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} done"

async def main():
tasks = [
asyncio.create_task(worker("A", 1)),
asyncio.create_task(worker("B", 2)),
asyncio.create_task(worker("C", 3))
]

# 等待完成(返回 done, pending)
done, pending = await asyncio.wait(tasks)

for task in done:
print(await task)

asyncio.run(main())

asyncio.wait_for()

设置超时:

import asyncio

async def long_task():
await asyncio.sleep(10)
return "完成"

async def main():
try:
# 最多等待 3 秒
result = await asyncio.wait_for(long_task(), timeout=3)
except asyncio.TimeoutError:
print("任务超时")

asyncio.run(main())

asyncio.as_completed()

按完成顺序获取结果:

import asyncio

async def task(name, delay):
await asyncio.sleep(delay)
return f"{name}: {delay}s"

async def main():
tasks = [
asyncio.create_task(task("A", 3)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 2))
]

for coro in asyncio.as_completed(tasks):
result = await coro
print(result)

asyncio.run(main())
# 输出顺序:B: 1s, C: 2s, A: 3s(按完成顺序)

异步上下文管理器

async with

import asyncio

class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(0.1)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.1)

async def main():
async with AsyncResource() as resource:
print("使用资源")

asyncio.run(main())

asyncio.timeout()(Python 3.11+)

import asyncio

async def long_operation():
await asyncio.sleep(10)
return "完成"

async def main():
try:
async with asyncio.timeout(3):
result = await long_operation()
except asyncio.TimeoutError:
print("操作超时")

asyncio.run(main())

异步迭代器

async for

import asyncio

class AsyncCounter:
def __init__(self, limit):
self.current = 0
self.limit = limit

def __aiter__(self):
return self

async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
self.current += 1
await asyncio.sleep(0.1) # 模拟异步操作
return self.current

async def main():
async for i in AsyncCounter(5):
print(i)

asyncio.run(main())

异步生成器

import asyncio

async def async_range(start, stop):
"""异步生成器"""
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i

async def main():
async for i in async_range(0, 5):
print(i)

asyncio.run(main())

异步列表推导式

import asyncio

async def double(x):
await asyncio.sleep(0.1)
return x * 2

async def main():
# 错误:不能在列表推导式中使用 await
# result = [await double(x) for x in range(5)]

# 正确:使用 asyncio.gather
results = await asyncio.gather(*[double(x) for x in range(5)])
print(results) # [0, 2, 4, 6, 8]

# 或者使用列表推导式(对于同步函数)
result = [x * 2 for x in range(5)]
print(result)

asyncio.run(main())

并发任务管理

TaskGroup(Python 3.11+)

import asyncio

async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成"

async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(worker("A", 1))
task2 = tg.create_task(worker("B", 2))

print(task1.result())
print(task2.result())

asyncio.run(main())

取消任务

import asyncio

async def long_task():
try:
while True:
print("工作中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("任务被取消")
raise

async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(3)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("已处理取消")

asyncio.run(main())

异步队列

asyncio.Queue

import asyncio

async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.5)
await queue.put(i)
print(f"生产: {i}")
await queue.put(None) # 发送结束信号

async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"消费: {item}")
queue.task_done()

async def main():
queue = asyncio.Queue()

# 同时运行生产者和消费者
await asyncio.gather(
producer(queue, 5),
consumer(queue)
)

asyncio.run(main())

实际应用

异步 HTTP 请求

使用 aiohttp:

import asyncio
import aiohttp

async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()

async def main():
async with aiohttp.ClientSession() as session:
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers"
]

tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)

for url, result in zip(urls, results):
print(f"{url}: {len(result)} bytes")

asyncio.run(main())

异步文件操作

使用 aiofiles:

import asyncio
import aiofiles

async def read_file(filepath):
async with aiofiles.open(filepath, 'r') as f:
content = await f.read()
return content

async def write_file(filepath, content):
async with aiofiles.open(filepath, 'w') as f:
await f.write(content)

async def main():
# 读取多个文件
files = ["file1.txt", "file2.txt", "file3.txt"]
tasks = [read_file(f) for f in files]
contents = await asyncio.gather(*tasks)

# 写入合并文件
combined = "\n".join(contents)
await write_file("combined.txt", combined)

asyncio.run(main())

异步数据库操作

使用 aiomysql:

import asyncio
import aiomysql

async def main():
pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='',
db='test'
)

async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users")
rows = await cur.fetchall()
for row in rows:
print(row)

pool.close()
await pool.wait_closed()

asyncio.run(main())

常见模式

重试机制

import asyncio

async def retry(coro, max_retries=3, delay=1):
for attempt in range(max_retries):
try:
return await coro
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"重试 {attempt + 1}/{max_retries}")
await asyncio.sleep(delay)

async def unstable():
import random
if random.random() < 0.7:
raise ValueError("随机失败")
return "成功"

async def main():
result = await retry(unstable())
print(result)

asyncio.run(main())

信号量控制并发

import asyncio

async def worker(semaphore, task_id):
async with semaphore: # 限制并发数
print(f"任务 {task_id} 开始")
await asyncio.sleep(1)
print(f"任务 {task_id} 完成")

async def main():
semaphore = asyncio.Semaphore(3) # 最多 3 个并发

tasks = [worker(semaphore, i) for i in range(10)]
await asyncio.gather(*tasks)

asyncio.run(main())

进度跟踪

import asyncio

async def process_item(item, progress):
await asyncio.sleep(0.5)
progress["completed"] += 1
print(f"完成: {progress['completed']}/{progress['total']}")

async def main():
items = range(10)
progress = {"total": len(items), "completed": 0}

tasks = [process_item(i, progress) for i in items]
await asyncio.gather(*tasks)

asyncio.run(main())

常见陷阱与最佳实践

根据 Python 官方文档,编写正确的 asyncio 代码需要了解一些常见的陷阱。

1. 避免阻塞操作

这是 asyncio 编程中最重要的原则。在异步代码中使用阻塞操作会阻塞整个事件循环,导致其他协程无法执行。

import time
import asyncio

# 错误:在协程中使用阻塞操作
async def bad_example():
time.sleep(1) # 阻塞整个事件循环 1 秒!
return "完成"

# 正确:使用异步版本
async def good_example():
await asyncio.sleep(1) # 不会阻塞事件循环
return "完成"

# 如果必须使用阻塞代码,使用 to_thread 将其放到线程池中执行
async def example():
# 阻塞操作会在单独的线程中执行
result = await asyncio.to_thread(blocking_function, arg)

常见的阻塞操作需要替换

阻塞操作异步替代
time.sleep()asyncio.sleep()
requests.get()aiohttp.ClientSession.get()
open(), read(), write()aiofiles 库的异步文件操作
socket.recv()asyncio.StreamReader
threading.Lock()asyncio.Lock()
# 正确的做法:对于同步库使用 run_in_executor
import asyncio
import requests # 同步库

async def fetch_url(url):
loop = asyncio.get_event_loop()
# 将阻塞的 HTTP 请求放到线程池中执行
response = await loop.run_in_executor(None, lambda: requests.get(url))
return response.text

2. 正确的异常处理

asyncio 中的异常处理有一些特殊情况需要注意:

import asyncio

async def may_fail():
await asyncio.sleep(1)
raise ValueError("错误")

async def main():
# 基础异常处理
try:
await may_fail()
except ValueError as e:
print(f"捕获异常: {e}")

# gather 中的异常处理 - 默认行为
# 如果任何一个任务失败,gather 会立即抛出异常
try:
await asyncio.gather(
may_fail(), # 会失败
asyncio.sleep(1),
)
except ValueError as e:
print("gather 中有任务失败")

# gather 中的异常处理 - 使用 return_exceptions
# return_exceptions=True 会捕获所有异常而不是抛出
results = await asyncio.gather(
may_fail(),
asyncio.sleep(1),
return_exceptions=True # 不抛出异常,将异常作为结果返回
)
for r in results:
if isinstance(r, Exception):
print(f"任务异常: {r}")
else:
print(f"任务结果: {r}")

asyncio.run(main())

TaskGroup 的异常处理(Python 3.11+)

import asyncio

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(asyncio.sleep(1))
tg.create_task(asyncio.sleep(0.5, result="完成"))
# 如果任一任务抛出异常,TaskGroup 会取消其他所有任务
except* Exception as eg:
for exc in eg.exceptions:
print(f"捕获异常: {exc}")

asyncio.run(main())

3. 检测未等待的协程

根据 官方文档,未等待的协程会导致警告:

import asyncio

async def test():
print("这个协程永远不会被执行")

async def main():
test() # 警告:coroutine 'test' was never awaited

# 启用调试模式可以看到更多信息
asyncio.run(main(), debug=True)

正确做法

async def main():
# 方式1:await 协程
await test()

# 方式2:创建任务
asyncio.create_task(test())

# 方式3:使用 asyncio.run
await asyncio.gather(test())

4. 避免死锁

死锁是异步编程中常见的问题。以下是避免死锁的最佳实践:

import asyncio

# 正确:使用异步锁
lock = asyncio.Lock()

async def worker():
async with lock: # 正确:使用 async with
await asyncio.sleep(1)
print("工作完成")

# 避免死锁的原则:
# 1. 尽量缩短持有锁的时间
# 2. 避免在持有锁时 await 外部代码
# 3. 考虑使用信号量代替锁

# 更好的方式:使用信号量限制并发
semaphore = asyncio.Semaphore(3) # 最多 3 个并发

async def worker(id):
async with semaphore:
print(f"Worker {id} 开始")
await asyncio.sleep(1)
print(f"Worker {id} 结束")

5. 调试模式

asyncio 提供了调试模式,可以帮助发现潜在问题:

import asyncio

# 启用调试模式的方式:
# 1. 环境变量
# PYTHONASYNCIODEBUG=1 python script.py

# 2. 程序中设置
async def main():
asyncio.set_debug(True)
# 你的代码

# 3. asyncio.run 参数
asyncio.run(main(), debug=True)

# 调试模式会检测:
# - 协程执行时间过长
# - 未等待的协程
# - 线程不安全的操作

6. 正确关闭事件循环

import asyncio

async def main():
await asyncio.sleep(1)

# 方式1:推荐
asyncio.run(main())

# 方式2:手动管理(需要正确清理)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
# 取消所有待执行的任务
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
# 等待所有任务完成
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.close()

7. 异步生成器的正确关闭

import asyncio

async def async_generator():
try:
for i in range(10):
yield i
await asyncio.sleep(0.1)
finally:
# 清理代码 - 确保在取消时也会执行
print("生成器清理")

async def main():
# 异步迭代器会被正确清理
async for value in async_generator():
if value > 3:
break # 会触发 finally 块
print(value)

asyncio.run(main())

8. 与线程混合使用

在某些情况下,需要在异步代码和线程代码之间进行交互:

import asyncio
import concurrent.futures

# 从协程中获取当前事件循环
def callback():
loop = asyncio.get_event_loop()
# 在线程中调度协程
asyncio.run_coroutine_threadsafe(some_async_func(), loop)

# 使用 run_in_executor 运行阻塞代码
async def main():
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor()

# 在线程池中运行阻塞函数
result = await loop.run_in_executor(executor, blocking_func, arg)

executor.shutdown(wait=True)

asyncio.run(main())

小结

本章我们学习了:

  1. 异步编程的概念和优势
  2. async/await 语法
  3. 协程和事件循环
  4. 并发运行(gather、wait、as_completed)
  5. 异步上下文管理器
  6. 异步迭代器和生成器
  7. 任务管理(TaskGroup、取消)
  8. 异步队列
  9. 实际应用场景
  10. 常见模式和注意事项

练习

  1. 编写一个异步爬虫,抓取多个网页
  2. 实现一个异步限流器
  3. 创建一个异步任务调度器
  4. 使用 aiohttp 实现 API 请求重试机制