任务与并发
任务是协程的包装器,让协程可以被调度并发执行。本章介绍如何创建和管理任务,以及多种并发执行协程的方式。
什么是任务?
Task 是协程的包装器,用于调度和跟踪协程的执行状态。
import asyncio
async def my_coro():
await asyncio.sleep(1)
return "完成"
async def main():
# 直接 await 协程:顺序执行
result = await my_coro()
# 包装为任务:可以被调度并发执行
task = asyncio.create_task(my_coro())
result = await task
asyncio.run(main())
协程 vs 任务
| 特性 | 协程 | 任务 |
|---|---|---|
| 创建方式 | 调用协程函数 | create_task() 或 TaskGroup |
| 执行时机 | 被 await 时执行 | 创建后立即调度 |
| 可取消性 | 不能直接取消 | 可以调用 cancel() |
| 状态查询 | 无状态 | pending/running/done/cancelled |
创建任务
asyncio.create_task()
最常用的创建任务方式(Python 3.7+):
import asyncio
async def worker(name, delay):
print(f"{name} 开始工作")
await asyncio.sleep(delay)
print(f"{name} 完成工作")
return f"{name} 的结果"
async def main():
# 创建任务,立即开始执行
task1 = asyncio.create_task(worker("Worker-1", 2))
task2 = asyncio.create_task(worker("Worker-2", 1))
# 可以做其他事情
print("任务已创建,正在执行...")
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
asyncio.run(main())
输出:
任务已创建,正在执行...
Worker-1 开始工作
Worker-2 开始工作
Worker-2 完成工作
Worker-1 完成工作
结果: Worker-1 的结果, Worker-2 的结果
注意:两个任务几乎同时开始,Worker-2 先完成(因为延迟更短)。
任务命名
可以为任务设置名称,便于调试:
import asyncio
async def worker():
await asyncio.sleep(1)
async def main():
task = asyncio.create_task(worker(), name="my-worker-task")
print(f"任务名称: {task.get_name()}")
await task
asyncio.run(main())
保存任务引用
重要:必须保存任务的引用,否则可能被垃圾回收:
# 错误:任务可能被垃圾回收
asyncio.create_task(some_coro())
# 正确:保存引用
tasks = set()
task = asyncio.create_task(some_coro())
tasks.add(task)
task.add_done_callback(tasks.discard) # 完成后移除
TaskGroup(Python 3.11+)
TaskGroup 是更现代、更安全的任务管理方式:
import asyncio
async def worker(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return name
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(worker("A", 1))
task2 = tg.create_task(worker("B", 2))
task3 = tg.create_task(worker("C", 1.5))
# 所有任务完成后继续
print(f"所有任务完成: {task1.result()}, {task2.result()}, {task3.result()}")
asyncio.run(main())
TaskGroup 的优势
- 自动等待:退出
async with块时自动等待所有任务完成 - 异常传播:任一任务失败时,自动取消其他任务并传播异常
- 结构化并发:任务的生命周期与作用域绑定
异常处理
import asyncio
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("任务失败!")
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task())
tg.create_task(asyncio.sleep(10)) # 会被取消
except* ValueError as e:
print(f"捕获到异常: {e}")
asyncio.run(main())
注意 except* 语法,这是 Python 3.11 引入的异常组语法。
并发执行
asyncio.gather()
同时运行多个协程并收集结果:
import asyncio
async def fetch(url):
print(f"获取 {url}")
await asyncio.sleep(1)
return f"{url} 的数据"
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"
]
# 并发获取所有 URL
results = await asyncio.gather(*[fetch(url) for url in urls])
for result in results:
print(result)
asyncio.run(main())
gather() 的特点:
- 返回结果列表,顺序与输入一致
- 默认情况下,一个失败会传播异常
- 可以设置
return_exceptions=True将异常作为结果返回
async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
return_exceptions=True # 异常不会传播,而是作为结果返回
)
for result in results:
if isinstance(result, Exception):
print(f"请求失败: {result}")
else:
print(f"请求成功: {result}")
asyncio.wait()
更底层的等待方式,返回已完成和未完成的任务集合:
import asyncio
async def worker(n):
await asyncio.sleep(n)
return n
async def main():
tasks = [asyncio.create_task(worker(i)) for i in [3, 1, 2]]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks)
for task in done:
print(f"结果: {task.result()}")
asyncio.run(main())
wait() 的返回条件:
# 等待所有任务完成(默认)
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
# 任一任务完成即返回
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# 任一任务抛出异常即返回
await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
asyncio.as_completed()
按完成顺序获取结果:
import asyncio
async def worker(n):
await asyncio.sleep(n)
return n
async def main():
tasks = [asyncio.create_task(worker(i)) for i in [3, 1, 2]]
# 按完成顺序处理
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
asyncio.run(main())
输出:
完成: 1
完成: 2
完成: 3
Python 3.13+ 异步迭代
从 Python 3.13 开始,as_completed() 返回的对象可以作为异步迭代器使用,这样可以更容易地追踪原始任务:
import asyncio
async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成"
async def main():
task1 = asyncio.create_task(worker("任务A", 3))
task2 = asyncio.create_task(worker("任务B", 1))
task3 = asyncio.create_task(worker("任务C", 2))
tasks = [task1, task2, task3]
# Python 3.13+:使用 async for 异步迭代
async for completed_task in asyncio.as_completed(tasks):
# completed_task 是原始任务对象
result = await completed_task
# 可以识别是哪个任务
if completed_task is task1:
print(f"任务A结果: {result}")
elif completed_task is task2:
print(f"任务B结果: {result}")
else:
print(f"任务C结果: {result}")
asyncio.run(main())
新旧方式对比:
# 旧方式(Python < 3.13):返回的是协程,不是原始任务
for coro in asyncio.as_completed(tasks):
result = await coro # coro 是新创建的协程,不是原始任务
# 新方式(Python 3.13+):返回原始任务
async for task in asyncio.as_completed(tasks):
result = await task # task 是原始任务对象
任务控制
取消任务
import asyncio
async def long_running():
try:
print("开始长时间任务")
await asyncio.sleep(10)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消")
raise # 应该重新抛出,让调用者知道
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(1)
task.cancel() # 取消任务
try:
await task
except asyncio.CancelledError:
print("捕获到取消异常")
asyncio.run(main())
检查任务状态
import asyncio
async def worker():
await asyncio.sleep(1)
return "完成"
async def main():
task = asyncio.create_task(worker())
print(f"完成?: {task.done()}") # False
print(f"已取消?: {task.cancelled()}") # False
await task
print(f"完成?: {task.done()}") # True
print(f"结果: {task.result()}") # "完成"
asyncio.run(main())
超时控制
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 方式一:wait_for
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时")
# 方式二:timeout 上下文管理器(Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("操作超时")
asyncio.run(main())
保护任务不被取消(shield)
asyncio.shield() 用于保护一个可等待对象不被外部取消。
基本用法
当外部代码尝试取消一个正在执行的任务时,使用 shield() 可以保护该任务继续执行:
import asyncio
async def important_task():
"""一个重要的、不应被中断的任务"""
print("开始执行重要任务...")
await asyncio.sleep(3)
print("重要任务完成!")
return "重要结果"
async def main():
# 创建受保护的任务
task = asyncio.create_task(important_task())
try:
# 使用 shield 保护任务不被取消
result = await asyncio.shield(task)
print(f"获得结果: {result}")
except asyncio.CancelledError:
# 外部取消了等待,但任务本身仍在执行
print("等待被取消,但任务仍在执行")
# 任务不会被取消,可以稍后再次等待
result = await task
print(f"最终结果: {result}")
asyncio.run(main())
shield 的工作原理
shield() 创建一个保护层,它的行为如下:
import asyncio
async def background_work():
print("后台工作开始")
await asyncio.sleep(5)
print("后台工作完成")
return "完成"
async def main():
task = asyncio.create_task(background_work())
# 不使用 shield
# 如果取消,task 也会被取消
# 使用 shield
protected = asyncio.shield(task)
# 如果取消 protected:
# - protected 会抛出 CancelledError
# - task 不会被取消,继续执行
try:
await asyncio.wait_for(protected, timeout=2)
except asyncio.TimeoutError:
print("等待超时,但后台任务仍在执行")
# 可以稍后等待 task 的结果
result = await task
print(f"结果: {result}")
asyncio.run(main())
实际应用场景
场景 1:保护关键操作
import asyncio
async def save_to_database(data):
"""保存数据到数据库,这个操作不应被中断"""
print(f"保存数据: {data}")
await asyncio.sleep(2) # 模拟数据库操作
print("数据保存成功")
return True
async def handle_request(request_id):
"""处理请求"""
try:
# 使用 shield 保护数据库操作
await asyncio.shield(save_to_database(f"数据-{request_id}"))
except asyncio.CancelledError:
print(f"请求 {request_id} 被取消,但数据已保存")
raise # 继续传播取消
async def main():
task = asyncio.create_task(handle_request(1))
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("请求处理完成(已取消)")
asyncio.run(main())
场景 2:超时控制与任务保护
import asyncio
async def long_operation():
"""长时间操作"""
await asyncio.sleep(10)
return "操作结果"
async def main():
task = asyncio.create_task(long_operation())
try:
# 设置超时,但不取消任务本身
result = await asyncio.wait_for(asyncio.shield(task), timeout=3)
except asyncio.TimeoutError:
print("等待超时")
# 任务仍在执行,可以稍后获取结果
print("等待任务完成...")
result = await task
print(f"最终结果: {result}")
asyncio.run(main())
重要注意事项
- 保存任务引用:事件循环只保持对任务的弱引用,必须保存引用:
async def main():
# 错误:任务可能被垃圾回收
await asyncio.shield(some_coro())
# 正确:保存任务引用
task = asyncio.create_task(some_coro())
await asyncio.shield(task)
- shield 不能防止所有取消:如果任务从内部被取消,shield 无法保护:
async def self_cancelling():
task = asyncio.current_task()
task.cancel() # 从内部取消
await asyncio.sleep(1)
async def main():
# shield 无法保护内部取消
await asyncio.shield(self_cancelling())
- 完全忽略取消(不推荐):
async def main():
task = asyncio.create_task(something())
try:
res = await asyncio.shield(task)
except asyncio.CancelledError:
res = None # 忽略取消,但任务仍在执行
任务回调
可以为任务添加完成时的回调函数:
import asyncio
def on_task_done(task):
try:
result = task.result()
print(f"回调: 任务完成,结果={result}")
except Exception as e:
print(f"回调: 任务失败,异常={e}")
async def worker():
await asyncio.sleep(1)
return "工作完成"
async def main():
task = asyncio.create_task(worker())
task.add_done_callback(on_task_done)
await task
asyncio.run(main())
实践示例
示例 1:并发下载器
import asyncio
async def download(url, session):
"""模拟下载"""
print(f"开始下载: {url}")
await asyncio.sleep(1) # 模拟网络延迟
print(f"下载完成: {url}")
return f"{url} 的内容"
async def download_all(urls):
"""并发下载所有 URL"""
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(download(url, None)) for url in urls]
return [task.result() for task in tasks]
async def main():
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
]
results = await download_all(urls)
print(f"共下载 {len(results)} 个文件")
asyncio.run(main())
示例 2:带重试的任务
import asyncio
import random
async def unreliable_task():
"""模拟可能失败的任务"""
if random.random() < 0.7: # 70% 概率失败
raise RuntimeError("任务失败")
return "成功"
async def retry_task(coro, max_retries=3, delay=1):
"""带重试的任务执行"""
for attempt in range(max_retries):
try:
return await coro()
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"第 {attempt + 1} 次尝试失败: {e},{delay}秒后重试")
await asyncio.sleep(delay)
async def main():
result = await retry_task(unreliable_task)
print(f"最终结果: {result}")
asyncio.run(main())
示例 3:生产者消费者
import asyncio
import random
async def producer(queue, producer_id):
"""生产者"""
for i in range(5):
item = f"产品-{producer_id}-{i}"
await queue.put(item)
print(f"生产者 {producer_id} 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# 标记生产结束
await queue.put(None)
async def consumer(queue, consumer_id):
"""消费者"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"消费者 {consumer_id} 消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 启动生产者和消费者
async with asyncio.TaskGroup() as tg:
producers = [tg.create_task(producer(queue, i)) for i in range(2)]
consumers = [tg.create_task(consumer(queue, i)) for i in range(3)]
print("所有任务完成")
asyncio.run(main())
小结
本章介绍了任务和并发执行的核心 API:
| API | 用途 |
|---|---|
create_task() | 创建任务 |
TaskGroup | 结构化并发(Python 3.11+,推荐) |
gather() | 并发执行并收集结果 |
wait() | 等待任务完成 |
as_completed() | 按完成顺序处理 |
cancel() | 取消任务 |
wait_for() | 带超时的等待 |
timeout() | 超时上下文管理器(Python 3.11+) |
shield() | 保护任务不被外部取消 |
最佳实践:
- 优先使用
TaskGroup管理任务(Python 3.11+) - 总是保存任务引用,避免被垃圾回收
- 正确处理
CancelledError,使用try/finally清理资源 - 为长时间任务设置超时
- 使用
shield()保护关键操作不被取消 - 使用队列实现生产者消费者模式