同步原语
当多个协程共享资源时,需要同步原语来协调访问,避免竞态条件。asyncio 提供了与 threading 模块类似的同步原语,但它们是为协程设计的。
为什么需要同步?
虽然 asyncio 是单线程的,但协程之间的切换仍可能导致竞态条件:
import asyncio
counter = 0
async def increment():
global counter
for _ in range(10000):
current = counter
# 这里可能发生协程切换!
await asyncio.sleep(0) # 模拟切换点
counter = current + 1
async def main():
# 两个协程同时修改 counter
await asyncio.gather(increment(), increment())
print(f"结果: {counter}") # 预期 20000,实际可能小于
asyncio.run(main())
同步原语可以解决这类问题。
Lock(锁)
Lock 是最基本的同步原语,确保同一时间只有一个协程访问共享资源。
基本用法
import asyncio
counter = 0
lock = asyncio.Lock()
async def increment():
global counter
for _ in range(10000):
async with lock: # 获取锁
current = counter
await asyncio.sleep(0) # 即使这里切换,其他协程也无法进入
counter = current + 1
# 离开 async with 块时自动释放锁
async def main():
await asyncio.gather(increment(), increment())
print(f"结果: {counter}") # 正确输出 20000
asyncio.run(main())
手动获取和释放
import asyncio
lock = asyncio.Lock()
async def worker():
await lock.acquire() # 获取锁
try:
print("持有锁中...")
await asyncio.sleep(1)
finally:
lock.release() # 释放锁
asyncio.run(worker())
非阻塞尝试
import asyncio
lock = asyncio.Lock()
async def try_lock():
if lock.locked():
print("锁已被占用")
else:
# 获取锁
async with lock:
print("成功获取锁")
await asyncio.sleep(1)
asyncio.run(try_lock())
Event(事件)
Event 用于通知一个或多个协程某个事件已发生。
基本用法
import asyncio
event = asyncio.Event()
async def waiter(name):
print(f"{name} 等待事件...")
await event.wait() # 阻塞直到事件被设置
print(f"{name} 收到事件通知!")
async def setter():
print("准备设置事件...")
await asyncio.sleep(2)
print("设置事件!")
event.set() # 设置事件,唤醒所有等待者
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(waiter("A"))
tg.create_task(waiter("B"))
tg.create_task(setter())
print("所有任务完成")
asyncio.run(main())
输出:
A 等待事件...
B 等待事件...
准备设置事件...
设置事件!
A 收到事件通知!
B 收到事件通知!
所有任务完成
清除和检查状态
import asyncio
event = asyncio.Event()
async def main():
print(f"事件状态: {event.is_set()}") # False
event.set()
print(f"事件状态: {event.is_set()}") # True
event.clear() # 清除事件
print(f"事件状态: {event.is_set()}") # False
asyncio.run(main())
Condition(条件变量)
Condition 结合了 Lock 和 Event 的功能,允许协程等待特定条件。
基本用法
import asyncio
import random
queue = []
condition = asyncio.Condition()
async def producer():
"""生产者:向队列添加数据"""
for i in range(5):
await asyncio.sleep(random.uniform(0.1, 0.5))
async with condition:
queue.append(i)
print(f"生产: {i}")
condition.notify() # 通知一个等待者
# condition.notify_all() # 通知所有等待者
async def consumer():
"""消费者:从队列取数据"""
for _ in range(5):
async with condition:
while not queue: # 必须用 while,防止虚假唤醒
await condition.wait() # 等待通知
item = queue.pop(0)
print(f"消费: {item}")
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())
asyncio.run(main())
wait_for 条件等待
import asyncio
condition = asyncio.Condition()
ready = False
async def wait_for_ready():
async with condition:
await condition.wait_for(lambda: ready) # 等待条件为真
print("条件满足,继续执行")
async def set_ready():
await asyncio.sleep(1)
async with condition:
global ready
ready = True
condition.notify_all()
async def main():
await asyncio.gather(wait_for_ready(), set_ready())
asyncio.run(main())
Semaphore(信号量)
Semaphore 限制同时访问资源的协程数量。
基本用法
import asyncio
semaphore = asyncio.Semaphore(2) # 最多允许 2 个协程同时访问
async def worker(n):
async with semaphore:
print(f"Worker {n} 开始工作")
await asyncio.sleep(2)
print(f"Worker {n} 完成工作")
async def main():
# 创建 5 个任务,但最多同时运行 2 个
tasks = [asyncio.create_task(worker(i)) for i in range(5)]
await asyncio.gather(*tasks)
asyncio.run(main())
输出(注意时间顺序):
Worker 0 开始工作 # 同时开始
Worker 1 开始工作 # 同时开始
Worker 0 完成工作 # 2秒后
Worker 1 完成工作 # 2秒后
Worker 2 开始工作 # 立即开始
Worker 3 开始工作 # 立即开始
...
限制并发连接数
import asyncio
# 限制最多 3 个并发请求
semaphore = asyncio.Semaphore(3)
async def fetch(url):
async with semaphore:
print(f"获取: {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"{url} 的内容"
async def main():
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = await asyncio.gather(*[fetch(url) for url in urls])
print(f"获取了 {len(results)} 个结果")
asyncio.run(main())
BoundedSemaphore
BoundedSemaphore 是 Semaphore 的变体,防止释放次数超过获取次数:
import asyncio
# 推荐使用 BoundedSemaphore
semaphore = asyncio.BoundedSemaphore(2)
async def worker():
async with semaphore:
print("工作中...")
await asyncio.sleep(1)
# 自动释放
Barrier(Python 3.11+)
Barrier 让多个协程在某个点同步等待,直到所有协程都到达。
import asyncio
async def worker(barrier, name):
print(f"{name} 开始第一阶段")
await asyncio.sleep(1)
# 等待所有协程到达这里
await barrier.wait()
print(f"{name} 开始第二阶段")
async def main():
barrier = asyncio.Barrier(3) # 需要 3 个协程
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(worker(barrier, f"Worker-{i}"))
asyncio.run(main())
输出:
Worker-0 开始第一阶段
Worker-1 开始第一阶段
Worker-2 开始第一阶段
# 所有协程都到达 barrier 后...
Worker-0 开始第二阶段
Worker-1 开始第二阶段
Worker-2 开始第二阶段
Queue(队列)
asyncio.Queue 是协程间通信的重要工具,线程安全且支持异步操作。
基本用法
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.5)
async def consumer(queue):
while True:
item = await queue.get()
print(f"消费: {item}")
queue.task_done() # 标记任务完成
if item == 4: # 结束条件
break
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())
队列类型
Queue:先进先出(FIFO)
queue = asyncio.Queue()
PriorityQueue:优先级队列
queue = asyncio.PriorityQueue()
await queue.put((2, "低优先级"))
await queue.put((1, "高优先级"))
# 先取出优先级高的
LifoQueue:后进先出(LIFO)
queue = asyncio.LifoQueue()
await queue.put(1)
await queue.put(2)
# 先取出最后放入的 2
非阻塞操作
import asyncio
queue = asyncio.Queue(maxsize=2) # 最大容量 2
async def main():
await queue.put(1)
await queue.put(2)
# 尝试放入(不阻塞)
try:
queue.put_nowait(3)
except asyncio.QueueFull:
print("队列已满")
# 尝试取出(不阻塞)
try:
item = queue.get_nowait()
print(f"取出: {item}")
except asyncio.QueueEmpty:
print("队列为空")
asyncio.run(main())
等待队列完成
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
await queue.join() # 等待所有任务被处理
async def consumer(queue):
while True:
item = await queue.get()
print(f"处理: {item}")
await asyncio.sleep(0.1)
queue.task_done()
async def main():
queue = asyncio.Queue()
consumer_task = asyncio.create_task(consumer(queue))
await producer(queue)
consumer_task.cancel()
asyncio.run(main())
实践示例
示例 1:限制数据库连接
import asyncio
class DatabasePool:
def __init__(self, max_connections=5):
self.semaphore = asyncio.Semaphore(max_connections)
self.connections = []
async def get_connection(self):
async with self.semaphore:
# 模拟获取连接
print("获取数据库连接")
await asyncio.sleep(0.1)
return "connection"
async def execute(self, query):
conn = await self.get_connection()
print(f"执行查询: {query}")
await asyncio.sleep(0.5)
return f"{query} 的结果"
async def main():
pool = DatabasePool(max_connections=2)
queries = [f"SELECT {i}" for i in range(5)]
results = await asyncio.gather(*[pool.execute(q) for q in queries])
print(f"执行了 {len(results)} 个查询")
asyncio.run(main())
示例 2:发布订阅模式
import asyncio
from collections import defaultdict
class PubSub:
def __init__(self):
self.subscribers = defaultdict(list)
def subscribe(self, topic):
queue = asyncio.Queue()
self.subscribers[topic].append(queue)
return queue
async def publish(self, topic, message):
for queue in self.subscribers[topic]:
await queue.put(message)
async def unsubscribe(self, topic, queue):
self.subscribers[topic].remove(queue)
async def subscriber(name, queue):
while True:
message = await queue.get()
print(f"{name} 收到: {message}")
async def main():
pubsub = PubSub()
# 订阅
queue1 = pubsub.subscribe("news")
queue2 = pubsub.subscribe("news")
# 启动订阅者
sub1 = asyncio.create_task(subscriber("Sub1", queue1))
sub2 = asyncio.create_task(subscriber("Sub2", queue2))
# 发布消息
await pubsub.publish("news", "Hello!")
await pubsub.publish("news", "World!")
await asyncio.sleep(1)
sub1.cancel()
sub2.cancel()
asyncio.run(main())
小结
| 同步原语 | 用途 |
|---|---|
Lock | 独占访问共享资源 |
Event | 通知一个或多个协程事件发生 |
Condition | 等待特定条件 |
Semaphore | 限制并发访问数量 |
Barrier | 多协程同步点 |
Queue | 协程间安全通信 |
最佳实践:
- 优先使用
async with语法自动管理锁 - 使用
BoundedSemaphore防止错误释放 - 条件等待使用
while循环防止虚假唤醒 - Queue 配合
task_done()和join()实现任务追踪 - 合理设置信号量大小,避免过度限制并发