协程基础
协程(Coroutine)是 asyncio 的核心概念。理解协程的工作原理,是掌握异步编程的关键。本章将详细介绍协程的定义、执行方式以及与普通函数的区别。
什么是协程?
协程是一种可以暂停执行并在稍后恢复的函数。与普通函数不同,协程可以在执行过程中"让出"控制权,允许其他代码运行,然后在适当的时候恢复执行。
协程 vs 普通函数
普通函数:一旦开始执行,就会一直运行到结束或遇到 return。
def normal_function():
print("开始")
# 这里做一些计算
print("结束")
return "结果"
# 调用函数,它会完整执行
result = normal_function()
协程:可以在执行过程中暂停,让出控制权。
import asyncio
async def coroutine_function():
print("开始")
# 让出控制权,暂停 1 秒
await asyncio.sleep(1)
print("结束")
return "结果"
# 调用协程不会立即执行!
coro = coroutine_function() # 返回协程对象
print(type(coro)) # <class 'coroutine'>
# 需要通过事件循环运行
asyncio.run(coro)
协程的两个概念
在 Python 文档中,"协程"可能指两个相关但不同的概念:
- 协程函数:使用
async def定义的函数 - 协程对象:调用协程函数返回的对象
# 协程函数
async def my_coro():
return 42
# 协程对象
coro = my_coro()
print(type(my_coro)) # <class 'function'>
print(type(coro)) # <class 'coroutine'>
理解这个区别很重要:协程函数只是定义了协程的行为,调用它只是创建了一个协程对象,并不会执行其中的代码。
async/await 语法
Python 使用 async 和 await 关键字来定义和使用协程。
async def 定义协程
使用 async def 定义的函数就是协程函数:
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(say_hello())
注意事项:
- 协程函数内部可以使用
await,普通函数不能 - 调用协程函数返回协程对象,不会立即执行
- 协程函数必须有返回值(默认返回 None)
await 等待异步操作
await 关键字用于等待一个可等待对象(Awaitable)完成:
import asyncio
async def fetch_data():
print("开始获取数据")
await asyncio.sleep(2) # 模拟耗时操作
print("数据获取完成")
return {"name": "张三", "age": 25}
async def main():
# 等待 fetch_data 完成
data = await fetch_data()
print(f"收到数据: {data}")
asyncio.run(main())
await 的作用:
- 暂停当前协程的执行
- 将控制权交还给事件循环
- 等待目标对象完成后恢复执行
- 获取返回值
awaitable 对象
可以被 await 的对象称为"可等待对象"(Awaitable)。主要有三种类型:
1. 协程对象
async def my_coro():
return 42
async def main():
result = await my_coro() # 等待协程
print(result)
asyncio.run(main())
2. Task 对象
async def main():
task = asyncio.create_task(my_coro())
result = await task # 等待任务
print(result)
3. Future 对象
Future 是低层 API,通常由库使用,应用代码很少直接创建:
async def main():
future = asyncio.Future()
# 某处设置结果
future.set_result(42)
result = await future # 等待 Future
print(result)
运行协程
协程不会自动执行,需要通过事件循环来运行。asyncio 提供了多种运行协程的方式。
asyncio.run()
这是运行协程最简单、最推荐的方式(Python 3.7+):
import asyncio
async def main():
print("Hello asyncio!")
asyncio.run(main())
asyncio.run() 的工作流程:
- 创建新的事件循环
- 运行传入的协程
- 关闭事件循环
- 返回协程的结果
重要特点:
- 应该作为 asyncio 程序的主入口点
- 不能在已有事件循环中调用(会抛出 RuntimeError)
- 会处理所有清理工作
在协程中调用协程
在协程内部,可以直接 await 另一个协程:
async def fetch_user():
await asyncio.sleep(1)
return {"id": 1, "name": "张三"}
async def fetch_orders(user_id):
await asyncio.sleep(1)
return [f"订单{i}" for i in range(3)]
async def main():
# 顺序执行两个协程
user = await fetch_user()
orders = await fetch_orders(user["id"])
print(f"用户: {user}, 订单: {orders}")
asyncio.run(main())
这种方式是顺序执行的,总耗时是各个协程耗时之和。
asyncio.create_task()
如果想让多个协程并发执行,需要创建任务:
import asyncio
async def fetch_user():
await asyncio.sleep(1)
return {"id": 1, "name": "张三"}
async def fetch_orders():
await asyncio.sleep(1)
return ["订单1", "订单2", "订单3"]
async def main():
# 创建任务,立即开始执行
task1 = asyncio.create_task(fetch_user())
task2 = asyncio.create_task(fetch_orders())
# 等待两个任务完成
user = await task1
orders = await task2
print(f"用户: {user}, 订单: {orders}")
asyncio.run(main()) # 总耗时约 1 秒(并发执行)
create_task() 将协程包装为 Task 对象并立即调度执行。
协程的生命周期
理解协程的生命周期有助于避免常见错误。
创建阶段
调用协程函数只是创建协程对象:
async def my_coro():
print("执行中...")
return 42
coro = my_coro() # 创建协程对象
# 此时协程内部的代码还没有执行!
执行阶段
协程被调度执行后,会运行到第一个 await 语句:
async def example():
print("第一步") # 立即执行
await asyncio.sleep(1) # 暂停,让出控制权
print("第二步") # 1 秒后恢复执行
完成阶段
协程执行完毕(遇到 return 或函数结束)后,结果会被存储:
async def get_value():
return 42
async def main():
coro = get_value()
result = await coro # 获取返回值
print(result) # 42
# 再次 await 会报错
# await coro # RuntimeError: cannot reuse already awaited coroutine
asyncio.run(main())
重要:协程对象只能被 await 一次,不能重用。
协程的状态
可以使用 asyncio.iscoroutine() 和 asyncio.iscoroutinefunction() 检查:
import asyncio
async def my_coro():
return 42
# 检查是否是协程函数
print(asyncio.iscoroutinefunction(my_coro)) # True
# 检查是否是协程对象
coro = my_coro()
print(asyncio.iscoroutine(coro)) # True
# 检查普通函数
def normal_func():
pass
print(asyncio.iscoroutinefunction(normal_func)) # False
常见错误
错误 1:忘记 await
async def fetch_data():
await asyncio.sleep(1)
return "数据"
async def main():
# 错误:忘记 await
data = fetch_data() # 这只是创建了协程对象!
print(data) # <coroutine object fetch_data at 0x...>
# Python 会警告:RuntimeWarning: coroutine 'fetch_data' was never awaited
asyncio.run(main())
正确做法:
async def main():
data = await fetch_data() # 等待协程完成
print(data) # "数据"
错误 2:在普通函数中使用 await
# 错误:在普通函数中使用 await
def normal_function():
await asyncio.sleep(1) # SyntaxError
# 正确:必须在协程中使用 await
async def async_function():
await asyncio.sleep(1) # 正确
错误 3:在已有事件循环中调用 asyncio.run()
async def inner():
# 错误:在协程中调用 asyncio.run()
asyncio.run(some_coro()) # RuntimeError: asyncio.run() cannot be called from a running event loop
async def main():
await inner()
asyncio.run(main())
正确做法:
async def inner():
# 直接 await 或创建任务
await some_coro()
async def main():
await inner()
错误 4:同步阻塞操作
import asyncio
import time
async def bad_example():
# 错误:使用同步 sleep 会阻塞整个事件循环
time.sleep(3) # 阻塞 3 秒,其他协程无法执行
return "完成"
async def good_example():
# 正确:使用异步 sleep
await asyncio.sleep(3) # 不阻塞,其他协程可以执行
return "完成"
协程的返回值
协程可以通过 return 返回值,调用者通过 await 获取:
import asyncio
async def calculate(a, b):
await asyncio.sleep(0.1)
return a + b
async def main():
result = await calculate(1, 2)
print(f"计算结果: {result}") # 计算结果: 3
asyncio.run(main())
实践示例
示例 1:简单的计时器
import asyncio
async def timer(name, seconds):
print(f"{name} 开始计时 {seconds} 秒")
await asyncio.sleep(seconds)
print(f"{name} 计时结束")
return f"{name} 完成"
async def main():
# 顺序执行
await timer("计时器A", 2)
await timer("计时器B", 1)
# 总耗时: 3 秒
asyncio.run(main())
示例 2:模拟网络请求
import asyncio
import random
async def fetch_url(url):
"""模拟获取 URL 内容"""
print(f"开始获取: {url}")
# 模拟网络延迟
delay = random.uniform(0.5, 2.0)
await asyncio.sleep(delay)
print(f"完成获取: {url} (耗时 {delay:.2f}s)")
return f"{url} 的内容"
async def main():
# 顺序获取多个 URL
urls = ["https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"]
for url in urls:
content = await fetch_url(url)
print(f"收到内容长度: {len(content)}")
asyncio.run(main())
示例 3:带异常处理的协程
import asyncio
async def risky_operation(n):
if n < 0:
raise ValueError("n 不能为负数")
await asyncio.sleep(1)
return n * 2
async def main():
try:
result = await risky_operation(-1)
except ValueError as e:
print(f"捕获异常: {e}")
else:
print(f"结果: {result}")
asyncio.run(main())
小结
本章介绍了协程的核心概念:
| 概念 | 说明 |
|---|---|
| 协程函数 | 使用 async def 定义的函数 |
| 协程对象 | 调用协程函数返回的对象 |
| await | 等待可等待对象完成并获取结果 |
| asyncio.run() | 运行协程的主入口函数 |
| create_task() | 创建任务实现并发执行 |
协程的核心要点:
- 协程不会自动执行,需要被调度
await会暂停协程,让出控制权- 协程对象只能被 await 一次
- 避免在协程中使用阻塞操作
下一章将介绍事件循环,深入理解协程是如何被调度和执行的。