跳到主要内容

同步原语

当多个协程共享资源时,需要同步原语来协调访问,避免竞态条件。asyncio 提供了与 threading 模块类似的同步原语,但它们是为协程设计的。

为什么需要同步?

虽然 asyncio 是单线程的,但协程之间的切换仍可能导致竞态条件:

import asyncio

counter = 0

async def increment():
global counter
for _ in range(10000):
current = counter
# 这里可能发生协程切换!
await asyncio.sleep(0) # 模拟切换点
counter = current + 1

async def main():
# 两个协程同时修改 counter
await asyncio.gather(increment(), increment())
print(f"结果: {counter}") # 预期 20000,实际可能小于

asyncio.run(main())

同步原语可以解决这类问题。

Lock(锁)

Lock 是最基本的同步原语,确保同一时间只有一个协程访问共享资源。

基本用法

import asyncio

counter = 0
lock = asyncio.Lock()

async def increment():
global counter
for _ in range(10000):
async with lock: # 获取锁
current = counter
await asyncio.sleep(0) # 即使这里切换,其他协程也无法进入
counter = current + 1
# 离开 async with 块时自动释放锁

async def main():
await asyncio.gather(increment(), increment())
print(f"结果: {counter}") # 正确输出 20000

asyncio.run(main())

手动获取和释放

import asyncio

lock = asyncio.Lock()

async def worker():
await lock.acquire() # 获取锁
try:
print("持有锁中...")
await asyncio.sleep(1)
finally:
lock.release() # 释放锁

asyncio.run(worker())

非阻塞尝试

import asyncio

lock = asyncio.Lock()

async def try_lock():
if lock.locked():
print("锁已被占用")
else:
# 获取锁
async with lock:
print("成功获取锁")
await asyncio.sleep(1)

asyncio.run(try_lock())

Event(事件)

Event 用于通知一个或多个协程某个事件已发生。

基本用法

import asyncio

event = asyncio.Event()

async def waiter(name):
print(f"{name} 等待事件...")
await event.wait() # 阻塞直到事件被设置
print(f"{name} 收到事件通知!")

async def setter():
print("准备设置事件...")
await asyncio.sleep(2)
print("设置事件!")
event.set() # 设置事件,唤醒所有等待者

async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(waiter("A"))
tg.create_task(waiter("B"))
tg.create_task(setter())

print("所有任务完成")

asyncio.run(main())

输出:

A 等待事件...
B 等待事件...
准备设置事件...
设置事件!
A 收到事件通知!
B 收到事件通知!
所有任务完成

清除和检查状态

import asyncio

event = asyncio.Event()

async def main():
print(f"事件状态: {event.is_set()}") # False

event.set()
print(f"事件状态: {event.is_set()}") # True

event.clear() # 清除事件
print(f"事件状态: {event.is_set()}") # False

asyncio.run(main())

Condition(条件变量)

Condition 结合了 Lock 和 Event 的功能,允许协程等待特定条件。

基本用法

import asyncio
import random

queue = []
condition = asyncio.Condition()

async def producer():
"""生产者:向队列添加数据"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))

async with condition:
queue.append(i)
print(f"生产: {i}")
condition.notify() # 通知一个等待者
# condition.notify_all() # 通知所有等待者

async def consumer():
"""消费者:从队列取数据"""
for _ in range(5):
async with condition:
while not queue: # 必须用 while,防止虚假唤醒
await condition.wait() # 等待通知
item = queue.pop(0)
print(f"消费: {item}")

async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())

asyncio.run(main())

wait_for 条件等待

import asyncio

condition = asyncio.Condition()
ready = False

async def wait_for_ready():
async with condition:
await condition.wait_for(lambda: ready) # 等待条件为真
print("条件满足,继续执行")

async def set_ready():
await asyncio.sleep(1)
async with condition:
global ready
ready = True
condition.notify_all()

async def main():
await asyncio.gather(wait_for_ready(), set_ready())

asyncio.run(main())

Semaphore(信号量)

Semaphore 限制同时访问资源的协程数量。

基本用法

import asyncio

semaphore = asyncio.Semaphore(2) # 最多允许 2 个协程同时访问

async def worker(n):
async with semaphore:
print(f"Worker {n} 开始工作")
await asyncio.sleep(2)
print(f"Worker {n} 完成工作")

async def main():
# 创建 5 个任务,但最多同时运行 2 个
tasks = [asyncio.create_task(worker(i)) for i in range(5)]
await asyncio.gather(*tasks)

asyncio.run(main())

输出(注意时间顺序):

Worker 0 开始工作  # 同时开始
Worker 1 开始工作 # 同时开始
Worker 0 完成工作 # 2秒后
Worker 1 完成工作 # 2秒后
Worker 2 开始工作 # 立即开始
Worker 3 开始工作 # 立即开始
...

限制并发连接数

import asyncio

# 限制最多 3 个并发请求
semaphore = asyncio.Semaphore(3)

async def fetch(url):
async with semaphore:
print(f"获取: {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"{url} 的内容"

async def main():
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch(url) for url in urls])
print(f"获取了 {len(results)} 个结果")

asyncio.run(main())

BoundedSemaphore

BoundedSemaphore 是 Semaphore 的变体,防止释放次数超过获取次数:

import asyncio

# 推荐使用 BoundedSemaphore
semaphore = asyncio.BoundedSemaphore(2)

async def worker():
async with semaphore:
print("工作中...")
await asyncio.sleep(1)
# 自动释放

Barrier(Python 3.11+)

Barrier 让多个协程在某个点同步等待,直到所有协程都到达。

import asyncio

async def worker(barrier, name):
print(f"{name} 开始第一阶段")
await asyncio.sleep(1)

# 等待所有协程到达这里
await barrier.wait()

print(f"{name} 开始第二阶段")

async def main():
barrier = asyncio.Barrier(3) # 需要 3 个协程

async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(worker(barrier, f"Worker-{i}"))

asyncio.run(main())

输出:

Worker-0 开始第一阶段
Worker-1 开始第一阶段
Worker-2 开始第一阶段
# 所有协程都到达 barrier 后...
Worker-0 开始第二阶段
Worker-1 开始第二阶段
Worker-2 开始第二阶段

Queue(队列)

asyncio.Queue 是协程间通信的重要工具,线程安全且支持异步操作。

基本用法

import asyncio

async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.5)

async def consumer(queue):
while True:
item = await queue.get()
print(f"消费: {item}")
queue.task_done() # 标记任务完成
if item == 4: # 结束条件
break

async def main():
queue = asyncio.Queue()

await asyncio.gather(
producer(queue),
consumer(queue)
)

asyncio.run(main())

队列类型

Queue:先进先出(FIFO)

queue = asyncio.Queue()

PriorityQueue:优先级队列

queue = asyncio.PriorityQueue()
await queue.put((2, "低优先级"))
await queue.put((1, "高优先级"))
# 先取出优先级高的

LifoQueue:后进先出(LIFO)

queue = asyncio.LifoQueue()
await queue.put(1)
await queue.put(2)
# 先取出最后放入的 2

非阻塞操作

import asyncio

queue = asyncio.Queue(maxsize=2) # 最大容量 2

async def main():
await queue.put(1)
await queue.put(2)

# 尝试放入(不阻塞)
try:
queue.put_nowait(3)
except asyncio.QueueFull:
print("队列已满")

# 尝试取出(不阻塞)
try:
item = queue.get_nowait()
print(f"取出: {item}")
except asyncio.QueueEmpty:
print("队列为空")

asyncio.run(main())

等待队列完成

import asyncio

async def producer(queue):
for i in range(5):
await queue.put(i)
await queue.join() # 等待所有任务被处理

async def consumer(queue):
while True:
item = await queue.get()
print(f"处理: {item}")
await asyncio.sleep(0.1)
queue.task_done()

async def main():
queue = asyncio.Queue()

consumer_task = asyncio.create_task(consumer(queue))
await producer(queue)
consumer_task.cancel()

asyncio.run(main())

实践示例

示例 1:限制数据库连接

import asyncio

class DatabasePool:
def __init__(self, max_connections=5):
self.semaphore = asyncio.Semaphore(max_connections)
self.connections = []

async def get_connection(self):
async with self.semaphore:
# 模拟获取连接
print("获取数据库连接")
await asyncio.sleep(0.1)
return "connection"

async def execute(self, query):
conn = await self.get_connection()
print(f"执行查询: {query}")
await asyncio.sleep(0.5)
return f"{query} 的结果"

async def main():
pool = DatabasePool(max_connections=2)

queries = [f"SELECT {i}" for i in range(5)]
results = await asyncio.gather(*[pool.execute(q) for q in queries])

print(f"执行了 {len(results)} 个查询")

asyncio.run(main())

示例 2:发布订阅模式

import asyncio
from collections import defaultdict

class PubSub:
def __init__(self):
self.subscribers = defaultdict(list)

def subscribe(self, topic):
queue = asyncio.Queue()
self.subscribers[topic].append(queue)
return queue

async def publish(self, topic, message):
for queue in self.subscribers[topic]:
await queue.put(message)

async def unsubscribe(self, topic, queue):
self.subscribers[topic].remove(queue)

async def subscriber(name, queue):
while True:
message = await queue.get()
print(f"{name} 收到: {message}")

async def main():
pubsub = PubSub()

# 订阅
queue1 = pubsub.subscribe("news")
queue2 = pubsub.subscribe("news")

# 启动订阅者
sub1 = asyncio.create_task(subscriber("Sub1", queue1))
sub2 = asyncio.create_task(subscriber("Sub2", queue2))

# 发布消息
await pubsub.publish("news", "Hello!")
await pubsub.publish("news", "World!")

await asyncio.sleep(1)
sub1.cancel()
sub2.cancel()

asyncio.run(main())

小结

同步原语用途
Lock独占访问共享资源
Event通知一个或多个协程事件发生
Condition等待特定条件
Semaphore限制并发访问数量
Barrier多协程同步点
Queue协程间安全通信

最佳实践:

  1. 优先使用 async with 语法自动管理锁
  2. 使用 BoundedSemaphore 防止错误释放
  3. 条件等待使用 while 循环防止虚假唤醒
  4. Queue 配合 task_done()join() 实现任务追踪
  5. 合理设置信号量大小,避免过度限制并发