跳到主要内容

任务与并发

任务是协程的包装器,让协程可以被调度并发执行。本章介绍如何创建和管理任务,以及多种并发执行协程的方式。

什么是任务?

Task 是协程的包装器,用于调度和跟踪协程的执行状态。

import asyncio

async def my_coro():
await asyncio.sleep(1)
return "完成"

async def main():
# 直接 await 协程:顺序执行
result = await my_coro()

# 包装为任务:可以被调度并发执行
task = asyncio.create_task(my_coro())
result = await task

asyncio.run(main())

协程 vs 任务

特性协程任务
创建方式调用协程函数create_task()TaskGroup
执行时机被 await 时执行创建后立即调度
可取消性不能直接取消可以调用 cancel()
状态查询无状态pending/running/done/cancelled

创建任务

asyncio.create_task()

最常用的创建任务方式(Python 3.7+):

import asyncio

async def worker(name, delay):
print(f"{name} 开始工作")
await asyncio.sleep(delay)
print(f"{name} 完成工作")
return f"{name} 的结果"

async def main():
# 创建任务,立即开始执行
task1 = asyncio.create_task(worker("Worker-1", 2))
task2 = asyncio.create_task(worker("Worker-2", 1))

# 可以做其他事情
print("任务已创建,正在执行...")

# 等待任务完成
result1 = await task1
result2 = await task2

print(f"结果: {result1}, {result2}")

asyncio.run(main())

输出:

任务已创建,正在执行...
Worker-1 开始工作
Worker-2 开始工作
Worker-2 完成工作
Worker-1 完成工作
结果: Worker-1 的结果, Worker-2 的结果

注意:两个任务几乎同时开始,Worker-2 先完成(因为延迟更短)。

任务命名

可以为任务设置名称,便于调试:

import asyncio

async def worker():
await asyncio.sleep(1)

async def main():
task = asyncio.create_task(worker(), name="my-worker-task")
print(f"任务名称: {task.get_name()}")
await task

asyncio.run(main())

保存任务引用

重要:必须保存任务的引用,否则可能被垃圾回收:

# 错误:任务可能被垃圾回收
asyncio.create_task(some_coro())

# 正确:保存引用
tasks = set()
task = asyncio.create_task(some_coro())
tasks.add(task)
task.add_done_callback(tasks.discard) # 完成后移除

TaskGroup(Python 3.11+)

TaskGroup 是更现代、更安全的任务管理方式:

import asyncio

async def worker(name, delay):
print(f"{name} 开始")
await asyncio.sleep(delay)
print(f"{name} 完成")
return name

async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(worker("A", 1))
task2 = tg.create_task(worker("B", 2))
task3 = tg.create_task(worker("C", 1.5))

# 所有任务完成后继续
print(f"所有任务完成: {task1.result()}, {task2.result()}, {task3.result()}")

asyncio.run(main())

TaskGroup 的优势

  1. 自动等待:退出 async with 块时自动等待所有任务完成
  2. 异常传播:任一任务失败时,自动取消其他任务并传播异常
  3. 结构化并发:任务的生命周期与作用域绑定

异常处理

import asyncio

async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("任务失败!")

async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task())
tg.create_task(asyncio.sleep(10)) # 会被取消
except* ValueError as e:
print(f"捕获到异常: {e}")

asyncio.run(main())

注意 except* 语法,这是 Python 3.11 引入的异常组语法。

并发执行

asyncio.gather()

同时运行多个协程并收集结果:

import asyncio

async def fetch(url):
print(f"获取 {url}")
await asyncio.sleep(1)
return f"{url} 的数据"

async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments"
]

# 并发获取所有 URL
results = await asyncio.gather(*[fetch(url) for url in urls])

for result in results:
print(result)

asyncio.run(main())

gather() 的特点:

  • 返回结果列表,顺序与输入一致
  • 默认情况下,一个失败会传播异常
  • 可以设置 return_exceptions=True 将异常作为结果返回
async def main():
results = await asyncio.gather(
fetch("url1"),
fetch("url2"),
return_exceptions=True # 异常不会传播,而是作为结果返回
)

for result in results:
if isinstance(result, Exception):
print(f"请求失败: {result}")
else:
print(f"请求成功: {result}")

asyncio.wait()

更底层的等待方式,返回已完成和未完成的任务集合:

import asyncio

async def worker(n):
await asyncio.sleep(n)
return n

async def main():
tasks = [asyncio.create_task(worker(i)) for i in [3, 1, 2]]

# 等待所有任务完成
done, pending = await asyncio.wait(tasks)

for task in done:
print(f"结果: {task.result()}")

asyncio.run(main())

wait() 的返回条件:

# 等待所有任务完成(默认)
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)

# 任一任务完成即返回
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

# 任一任务抛出异常即返回
await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

asyncio.as_completed()

按完成顺序获取结果:

import asyncio

async def worker(n):
await asyncio.sleep(n)
return n

async def main():
tasks = [asyncio.create_task(worker(i)) for i in [3, 1, 2]]

# 按完成顺序处理
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")

asyncio.run(main())

输出:

完成: 1
完成: 2
完成: 3

Python 3.13+ 异步迭代

从 Python 3.13 开始,as_completed() 返回的对象可以作为异步迭代器使用,这样可以更容易地追踪原始任务:

import asyncio

async def worker(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成"

async def main():
task1 = asyncio.create_task(worker("任务A", 3))
task2 = asyncio.create_task(worker("任务B", 1))
task3 = asyncio.create_task(worker("任务C", 2))

tasks = [task1, task2, task3]

# Python 3.13+:使用 async for 异步迭代
async for completed_task in asyncio.as_completed(tasks):
# completed_task 是原始任务对象
result = await completed_task

# 可以识别是哪个任务
if completed_task is task1:
print(f"任务A结果: {result}")
elif completed_task is task2:
print(f"任务B结果: {result}")
else:
print(f"任务C结果: {result}")

asyncio.run(main())

新旧方式对比

# 旧方式(Python < 3.13):返回的是协程,不是原始任务
for coro in asyncio.as_completed(tasks):
result = await coro # coro 是新创建的协程,不是原始任务

# 新方式(Python 3.13+):返回原始任务
async for task in asyncio.as_completed(tasks):
result = await task # task 是原始任务对象

任务控制

取消任务

import asyncio

async def long_running():
try:
print("开始长时间任务")
await asyncio.sleep(10)
print("任务完成")
except asyncio.CancelledError:
print("任务被取消")
raise # 应该重新抛出,让调用者知道

async def main():
task = asyncio.create_task(long_running())

await asyncio.sleep(1)
task.cancel() # 取消任务

try:
await task
except asyncio.CancelledError:
print("捕获到取消异常")

asyncio.run(main())

检查任务状态

import asyncio

async def worker():
await asyncio.sleep(1)
return "完成"

async def main():
task = asyncio.create_task(worker())

print(f"完成?: {task.done()}") # False
print(f"已取消?: {task.cancelled()}") # False

await task

print(f"完成?: {task.done()}") # True
print(f"结果: {task.result()}") # "完成"

asyncio.run(main())

超时控制

import asyncio

async def slow_operation():
await asyncio.sleep(10)
return "完成"

async def main():
try:
# 方式一:wait_for
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("操作超时")

# 方式二:timeout 上下文管理器(Python 3.11+)
try:
async with asyncio.timeout(2.0):
result = await slow_operation()
except TimeoutError:
print("操作超时")

asyncio.run(main())

保护任务不被取消(shield)

asyncio.shield() 用于保护一个可等待对象不被外部取消。

基本用法

当外部代码尝试取消一个正在执行的任务时,使用 shield() 可以保护该任务继续执行:

import asyncio

async def important_task():
"""一个重要的、不应被中断的任务"""
print("开始执行重要任务...")
await asyncio.sleep(3)
print("重要任务完成!")
return "重要结果"

async def main():
# 创建受保护的任务
task = asyncio.create_task(important_task())

try:
# 使用 shield 保护任务不被取消
result = await asyncio.shield(task)
print(f"获得结果: {result}")
except asyncio.CancelledError:
# 外部取消了等待,但任务本身仍在执行
print("等待被取消,但任务仍在执行")
# 任务不会被取消,可以稍后再次等待
result = await task
print(f"最终结果: {result}")

asyncio.run(main())

shield 的工作原理

shield() 创建一个保护层,它的行为如下:

import asyncio

async def background_work():
print("后台工作开始")
await asyncio.sleep(5)
print("后台工作完成")
return "完成"

async def main():
task = asyncio.create_task(background_work())

# 不使用 shield
# 如果取消,task 也会被取消

# 使用 shield
protected = asyncio.shield(task)

# 如果取消 protected:
# - protected 会抛出 CancelledError
# - task 不会被取消,继续执行

try:
await asyncio.wait_for(protected, timeout=2)
except asyncio.TimeoutError:
print("等待超时,但后台任务仍在执行")
# 可以稍后等待 task 的结果
result = await task
print(f"结果: {result}")

asyncio.run(main())

实际应用场景

场景 1:保护关键操作

import asyncio

async def save_to_database(data):
"""保存数据到数据库,这个操作不应被中断"""
print(f"保存数据: {data}")
await asyncio.sleep(2) # 模拟数据库操作
print("数据保存成功")
return True

async def handle_request(request_id):
"""处理请求"""
try:
# 使用 shield 保护数据库操作
await asyncio.shield(save_to_database(f"数据-{request_id}"))
except asyncio.CancelledError:
print(f"请求 {request_id} 被取消,但数据已保存")
raise # 继续传播取消

async def main():
task = asyncio.create_task(handle_request(1))
await asyncio.sleep(0.5)
task.cancel()

try:
await task
except asyncio.CancelledError:
print("请求处理完成(已取消)")

asyncio.run(main())

场景 2:超时控制与任务保护

import asyncio

async def long_operation():
"""长时间操作"""
await asyncio.sleep(10)
return "操作结果"

async def main():
task = asyncio.create_task(long_operation())

try:
# 设置超时,但不取消任务本身
result = await asyncio.wait_for(asyncio.shield(task), timeout=3)
except asyncio.TimeoutError:
print("等待超时")
# 任务仍在执行,可以稍后获取结果
print("等待任务完成...")
result = await task
print(f"最终结果: {result}")

asyncio.run(main())

重要注意事项

  1. 保存任务引用:事件循环只保持对任务的弱引用,必须保存引用:
async def main():
# 错误:任务可能被垃圾回收
await asyncio.shield(some_coro())

# 正确:保存任务引用
task = asyncio.create_task(some_coro())
await asyncio.shield(task)
  1. shield 不能防止所有取消:如果任务从内部被取消,shield 无法保护:
async def self_cancelling():
task = asyncio.current_task()
task.cancel() # 从内部取消
await asyncio.sleep(1)

async def main():
# shield 无法保护内部取消
await asyncio.shield(self_cancelling())
  1. 完全忽略取消(不推荐)
async def main():
task = asyncio.create_task(something())
try:
res = await asyncio.shield(task)
except asyncio.CancelledError:
res = None # 忽略取消,但任务仍在执行

任务回调

可以为任务添加完成时的回调函数:

import asyncio

def on_task_done(task):
try:
result = task.result()
print(f"回调: 任务完成,结果={result}")
except Exception as e:
print(f"回调: 任务失败,异常={e}")

async def worker():
await asyncio.sleep(1)
return "工作完成"

async def main():
task = asyncio.create_task(worker())
task.add_done_callback(on_task_done)
await task

asyncio.run(main())

实践示例

示例 1:并发下载器

import asyncio

async def download(url, session):
"""模拟下载"""
print(f"开始下载: {url}")
await asyncio.sleep(1) # 模拟网络延迟
print(f"下载完成: {url}")
return f"{url} 的内容"

async def download_all(urls):
"""并发下载所有 URL"""
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(download(url, None)) for url in urls]

return [task.result() for task in tasks]

async def main():
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3",
]

results = await download_all(urls)
print(f"共下载 {len(results)} 个文件")

asyncio.run(main())

示例 2:带重试的任务

import asyncio
import random

async def unreliable_task():
"""模拟可能失败的任务"""
if random.random() < 0.7: # 70% 概率失败
raise RuntimeError("任务失败")
return "成功"

async def retry_task(coro, max_retries=3, delay=1):
"""带重试的任务执行"""
for attempt in range(max_retries):
try:
return await coro()
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"第 {attempt + 1} 次尝试失败: {e}{delay}秒后重试")
await asyncio.sleep(delay)

async def main():
result = await retry_task(unreliable_task)
print(f"最终结果: {result}")

asyncio.run(main())

示例 3:生产者消费者

import asyncio
import random

async def producer(queue, producer_id):
"""生产者"""
for i in range(5):
item = f"产品-{producer_id}-{i}"
await queue.put(item)
print(f"生产者 {producer_id} 生产: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))

# 标记生产结束
await queue.put(None)

async def consumer(queue, consumer_id):
"""消费者"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break

print(f"消费者 {consumer_id} 消费: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()

async def main():
queue = asyncio.Queue(maxsize=10)

# 启动生产者和消费者
async with asyncio.TaskGroup() as tg:
producers = [tg.create_task(producer(queue, i)) for i in range(2)]
consumers = [tg.create_task(consumer(queue, i)) for i in range(3)]

print("所有任务完成")

asyncio.run(main())

小结

本章介绍了任务和并发执行的核心 API:

API用途
create_task()创建任务
TaskGroup结构化并发(Python 3.11+,推荐)
gather()并发执行并收集结果
wait()等待任务完成
as_completed()按完成顺序处理
cancel()取消任务
wait_for()带超时的等待
timeout()超时上下文管理器(Python 3.11+)
shield()保护任务不被外部取消

最佳实践:

  1. 优先使用 TaskGroup 管理任务(Python 3.11+)
  2. 总是保存任务引用,避免被垃圾回收
  3. 正确处理 CancelledError,使用 try/finally 清理资源
  4. 为长时间任务设置超时
  5. 使用 shield() 保护关键操作不被取消
  6. 使用队列实现生产者消费者模式