事件循环
事件循环(Event Loop)是 asyncio 的核心引擎,它负责调度和执行所有异步任务、处理 I/O 事件、运行回调函数。理解事件循环的工作原理,有助于深入掌握异步编程的本质。
什么是事件循环?
事件循环是一个不断运行的程序结构,它持续监控各种事件源(如网络连接、定时器、文件描述符),当事件发生时,调用相应的处理函数。
可以把事件循环想象成一个"任务调度中心":
┌─────────────────────────────────────────────────────────┐
│ 事件循环 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 任务队列 │ │ I/O 监控 │ │ 定时器 │ │
│ │ │ │ │ │ │ │
│ │ Task 1 │ │ Socket 1 │ │ Timer 1 │ │
│ │ Task 2 │ │ Socket 2 │ │ Timer 2 │ │
│ │ Task 3 │ │ File FD │ │ Timer 3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 事件分发器 │ │
│ │ (Dispatcher) │ │
│ └───────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 执行回调/协程 │ │
│ └────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
事件循环的工作流程
事件循环的工作是一个无限循环的过程:
- 检查就绪的任务:查看哪些任务可以执行
- 执行任务:运行任务直到遇到
await或完成 - 处理 I/O 事件:检查网络、文件等 I/O 是否就绪
- 处理定时器:执行到期的定时回调
- 重复:回到步骤 1
# 事件循环的简化伪代码
while True:
# 1. 执行就绪的任务
for task in ready_tasks:
run_until_await(task)
# 2. 检查 I/O 事件
io_events = check_io_events()
for event in io_events:
callback = get_callback(event)
callback()
# 3. 处理定时器
expired_timers = get_expired_timers()
for timer in expired_timers:
timer.callback()
# 4. 如果没有任何事情可做,等待
if nothing_to_do():
wait_for_events()
为什么是单线程?
asyncio 使用单线程模型,这与多线程并发有本质区别:
多线程:多个线程同时执行,操作系统负责调度。线程切换有开销,且需要处理线程安全问题。
asyncio 单线程:只有一个线程执行代码,通过协程的协作式切换来实现并发。协程主动让出控制权(await),其他协程才能执行。
单线程模型的优势:
- 无竞争条件:不需要锁来保护共享资源(在同一个事件循环内)
- 切换开销小:协程切换只是函数调用的开销,远小于线程切换
- 可预测:协程切换点明确(
await语句),更容易推理程序行为
但这也意味着:如果一个协程长时间占用 CPU,其他协程就无法执行。所以 asyncio 适合 I/O 密集型任务,不适合 CPU 密集型任务。
获取事件循环
asyncio.run() - 推荐方式
asyncio.run() 是运行异步程序的标准入口(Python 3.7+),它会自动创建和管理事件循环:
import asyncio
async def main():
print("Hello asyncio!")
# 自动创建事件循环、运行协程、关闭事件循环
asyncio.run(main())
asyncio.run() 做了什么:
- 创建新的事件循环
- 设置为当前线程的事件循环
- 运行传入的协程
- 关闭事件循环(取消所有任务、清理资源)
- 返回协程的结果
注意:同一个程序中只能调用一次 asyncio.run(),不能在已有事件循环中再次调用。
get_running_loop() - 在协程中获取
在协程内部,使用 asyncio.get_running_loop() 获取当前正在运行的事件循环:
import asyncio
async def main():
loop = asyncio.get_running_loop()
print(f"事件循环: {loop}")
print(f"是否运行中: {loop.is_running()}")
asyncio.run(main())
这是最安全的方式,因为它确保一定有正在运行的事件循环。
get_event_loop() - 已弃用的方式
在旧代码中可能看到 asyncio.get_event_loop(),但这个方法在 Python 3.10+ 中已被标记为不推荐:
# 不推荐的方式
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
这种方式容易出错,因为它可能在某些情况下自动创建事件循环,导致混乱。现代代码应该使用 asyncio.run() 或 get_running_loop()。
事件循环的核心方法
run_until_complete()
运行直到给定的协程或 Future 完成:
import asyncio
async def work():
await asyncio.sleep(1)
return "完成"
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(work())
print(result) # "完成"
finally:
loop.close()
run_until_complete() 会阻塞当前线程,直到协程执行完毕。返回协程的结果或抛出异常。
run_forever()
无限运行事件循环,直到调用 stop():
import asyncio
async def periodic():
"""每秒执行一次的周期任务"""
while True:
print("执行中...")
await asyncio.sleep(1)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 创建任务
task = loop.create_task(periodic())
try:
loop.run_forever() # 无限运行
except KeyboardInterrupt:
print("用户中断")
finally:
task.cancel() # 取消任务
loop.close()
stop()
停止事件循环:
import asyncio
async def stop_after(delay):
"""延迟后停止事件循环"""
await asyncio.sleep(delay)
loop = asyncio.get_running_loop()
loop.stop()
loop = asyncio.new_event_loop()
loop.create_task(stop_after(3))
loop.run_forever() # 3秒后被 stop() 停止
loop.close()
is_running() 和 is_closed()
检查事件循环状态:
import asyncio
loop = asyncio.new_event_loop()
print(f"运行中: {loop.is_running()}") # False
print(f"已关闭: {loop.is_closed()}") # False
loop.run_until_complete(asyncio.sleep(0.1))
print(f"运行中: {loop.is_running()}") # False(已执行完毕)
print(f"已关闭: {loop.is_closed()}") # False
loop.close()
print(f"已关闭: {loop.is_closed()}") # True
调度回调函数
事件循环可以调度普通函数(非协程)在未来某个时间点执行。
call_soon() - 尽快执行
在事件循环的下一次迭代执行回调:
import asyncio
def callback(name):
print(f"回调执行: {name}")
async def main():
loop = asyncio.get_running_loop()
# 调度三个回调
loop.call_soon(callback, "第一个")
loop.call_soon(callback, "第二个")
loop.call_soon(callback, "第三个")
print("回调已调度")
await asyncio.sleep(0) # 让出控制权,让回调执行
print("协程继续")
asyncio.run(main())
输出:
回调已调度
回调执行: 第一个
回调执行: 第二个
回调执行: 第三个
协程继续
回调按调度顺序执行。
call_later() - 延迟执行
在指定延迟后执行回调:
import asyncio
def callback():
print("延迟回调执行")
async def main():
loop = asyncio.get_running_loop()
# 2 秒后执行
handle = loop.call_later(2, callback)
print("等待回调...")
await asyncio.sleep(3)
print("结束")
asyncio.run(main())
call_later() 返回一个 TimerHandle 对象,可以用来取消回调:
handle = loop.call_later(5, callback)
# ... 如果需要取消
handle.cancel()
call_at() - 在指定时间执行
在事件循环的特定时间点执行回调:
import asyncio
def callback():
print("定时回调执行")
async def main():
loop = asyncio.get_running_loop()
# 获取当前时间
now = loop.time()
# 在 1 秒后的时间点执行
loop.call_at(now + 1, callback)
print(f"当前时间: {now}")
await asyncio.sleep(2)
asyncio.run(main())
loop.time() 返回事件循环的内部单调时钟(与 time.time() 不同),用于 call_at() 的时间参数。
在线程池中执行阻塞操作
事件循环提供了在线程池中执行阻塞函数的能力,避免阻塞主线程。
run_in_executor()
将阻塞函数放到线程池执行:
import asyncio
import time
def blocking_io():
"""模拟阻塞 I/O 操作"""
print("开始阻塞操作")
time.sleep(3) # 这会阻塞线程
print("阻塞操作完成")
return "结果"
async def main():
loop = asyncio.get_running_loop()
print("提交阻塞操作到线程池")
# 在默认线程池中执行
result = await loop.run_in_executor(None, blocking_io)
print(f"得到结果: {result}")
asyncio.run(main())
run_in_executor(None, func) 使用默认的 ThreadPoolExecutor。也可以指定自定义执行器:
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
result = await loop.run_in_executor(executor, blocking_io)
asyncio.to_thread() - 更简洁的方式
Python 3.9+ 提供了更简洁的语法:
import asyncio
import time
def blocking_io(name):
time.sleep(1)
return f"{name} 完成"
async def main():
# 并发执行多个阻塞操作
results = await asyncio.gather(
asyncio.to_thread(blocking_io, "任务1"),
asyncio.to_thread(blocking_io, "任务2"),
asyncio.to_thread(blocking_io, "任务3"),
)
print(results)
asyncio.run(main())
创建任务和 Future
create_task()
在事件循环上创建任务:
import asyncio
async def worker():
await asyncio.sleep(1)
return "工作完成"
async def main():
loop = asyncio.get_running_loop()
# 方式一:通过事件循环创建
task1 = loop.create_task(worker())
# 方式二:通过 asyncio 模块(更常用)
task2 = asyncio.create_task(worker())
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
create_future()
创建与事件循环关联的 Future 对象:
import asyncio
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
# 模拟异步设置结果
async def set_result():
await asyncio.sleep(1)
future.set_result("Future 结果")
asyncio.create_task(set_result())
# 等待 Future
result = await future
print(result)
asyncio.run(main())
多线程与事件循环
虽然 asyncio 是单线程的,但可以在多个线程中运行多个事件循环:
import asyncio
import threading
async def worker(name):
print(f"{name} 在线程 {threading.current_thread().name}")
await asyncio.sleep(1)
return f"{name} 完成"
def run_in_thread(name):
"""在新线程中运行事件循环"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(worker(name))
print(f"结果: {result}")
finally:
loop.close()
# 启动两个线程
t1 = threading.Thread(target=run_in_thread, args=("线程1",))
t2 = threading.Thread(target=run_in_thread, args=("线程2",))
t1.start()
t2.start()
t1.join()
t2.join()
线程安全的回调调度
从其他线程向事件循环提交任务:
import asyncio
import threading
async def main():
loop = asyncio.get_running_loop()
# 在另一个线程中调用
def thread_func():
print("另一个线程尝试调度回调")
# 必须使用 call_soon_threadsafe
loop.call_soon_threadsafe(
lambda: print("回调在事件循环线程执行")
)
t = threading.Thread(target=thread_func)
t.start()
await asyncio.sleep(1)
t.join()
asyncio.run(main())
call_soon_threadsafe() 是唯一线程安全的调度方法,从其他线程操作事件循环时必须使用它。
信号处理
事件循环可以注册 Unix 信号处理器,让程序能够优雅地响应系统信号,例如用户按下 Ctrl+C 或系统发送终止信号。
基本用法
使用 add_signal_handler() 注册信号处理器:
import asyncio
import signal
async def main():
loop = asyncio.get_running_loop()
# 定义信号处理函数
def handle_sigint():
print("\n收到 SIGINT 信号,正在停止...")
# 停止事件循环
loop.stop()
def handle_sigterm():
print("收到 SIGTERM 信号,正在清理...")
loop.stop()
# 注册信号处理器
loop.add_signal_handler(signal.SIGINT, handle_sigint)
loop.add_signal_handler(signal.SIGTERM, handle_sigterm)
print("程序运行中,按 Ctrl+C 停止")
# 模拟长时间运行
try:
while True:
await asyncio.sleep(1)
print("工作中...")
except asyncio.CancelledError:
print("任务被取消")
# 注意:信号处理只在 Unix 系统上有效
# Windows 上需要使用其他方式
asyncio.run(main())
优雅关闭示例
对于需要清理资源的服务器程序,信号处理尤其重要:
import asyncio
import signal
class GracefulServer:
"""支持优雅关闭的服务器"""
def __init__(self):
self.running = True
self.tasks = set()
async def worker(self, name):
"""工作协程"""
try:
while self.running:
print(f"{name} 工作中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name} 被取消,正在清理...")
await asyncio.sleep(0.5) # 模拟清理
print(f"{name} 清理完成")
async def run(self):
loop = asyncio.get_running_loop()
# 注册信号处理器
def shutdown():
print("\n开始优雅关闭...")
self.running = False
# 取消所有任务
for task in self.tasks:
task.cancel()
# 只在 Unix 系统上注册
try:
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
except NotImplementedError:
# Windows 不支持 add_signal_handler
print("警告:当前系统不支持信号处理")
# 启动多个工作协程
for i in range(3):
task = asyncio.create_task(self.worker(f"Worker-{i}"))
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
# 等待所有任务完成
await asyncio.gather(*self.tasks, return_exceptions=True)
print("服务器已关闭")
asyncio.run(GracefulServer().run())
移除信号处理器
使用 remove_signal_handler() 移除已注册的处理器:
import asyncio
import signal
async def main():
loop = asyncio.get_running_loop()
def handler():
print("收到信号")
# 注册处理器
loop.add_signal_handler(signal.SIGUSR1, handler)
# 移除处理器
loop.remove_signal_handler(signal.SIGUSR1)
# 移除处理器后,信号将使用默认行为
asyncio.run(main())
Windows 系统的注意事项
Windows 系统不完全支持 add_signal_handler()。对于跨平台应用,可以使用以下方式:
import asyncio
import signal
import sys
async def main():
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()
def shutdown():
print("正在关闭...")
shutdown_event.set()
# Unix 系统:使用 add_signal_handler
if sys.platform != 'win32':
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
else:
# Windows:只能在主线程中捕获 KeyboardInterrupt
# 建议使用其他方式,如控制台输入
pass
# 等待关闭信号
await shutdown_event.wait()
print("程序结束")
asyncio.run(main())
常用信号
| 信号 | 说明 | 典型用途 |
|---|---|---|
SIGINT | 中断信号(Ctrl+C) | 用户主动中断程序 |
SIGTERM | 终止信号 | 系统或进程管理器请求终止 |
SIGHUP | 挂起信号 | 通常用于重新加载配置 |
SIGUSR1/SIGUSR2 | 用户自定义信号 | 自定义功能 |
注意:SIGKILL(信号 9)无法被捕获,程序会立即终止。
事件循环的两种实现
asyncio 在不同操作系统上使用不同的底层实现:
SelectorEventLoop(Unix 默认)
使用 select、poll、epoll 或 kqueue 系统调用监控 I/O 事件。适用于 Unix 系统。
ProactorEventLoop(Windows 默认)
使用 Windows 的 I/O 完成端口(IOCP)实现。对 Windows 上的高性能网络应用更友好。
import asyncio
import sys
# 查看当前使用的实现
loop = asyncio.new_event_loop()
print(f"事件循环类型: {type(loop).__name__}")
loop.close()
实践示例
示例 1:带超时的任务执行
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "完成"
async def main():
loop = asyncio.get_running_loop()
task = loop.create_task(slow_task())
try:
# 使用 wait_for 添加超时
result = await asyncio.wait_for(task, timeout=2)
except asyncio.TimeoutError:
print("任务超时")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")
asyncio.run(main())
示例 2:定时器实现
import asyncio
class AsyncTimer:
"""异步定时器"""
def __init__(self, interval, callback):
self.interval = interval
self.callback = callback
self._handle = None
def start(self):
loop = asyncio.get_running_loop()
self._handle = loop.call_later(self.interval, self._run)
def _run(self):
self.callback()
# 重新调度
self.start()
def stop(self):
if self._handle:
self._handle.cancel()
async def main():
timer = AsyncTimer(1, lambda: print("滴答"))
timer.start()
await asyncio.sleep(5)
timer.stop()
print("定时器停止")
asyncio.run(main())
示例 3:优雅关闭
import asyncio
async def background_task():
try:
while True:
print("后台任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("后台任务清理中...")
await asyncio.sleep(0.5) # 模拟清理
print("后台任务已停止")
async def main():
task = asyncio.create_task(background_task())
# 运行一段时间
await asyncio.sleep(3)
# 取消任务
task.cancel()
# 等待任务处理取消
try:
await task
except asyncio.CancelledError:
pass
print("程序结束")
asyncio.run(main())
小结
事件循环是 asyncio 的核心,它的工作原理可以总结为:
| 概念 | 说明 |
|---|---|
| 事件循环 | 调度和执行异步任务的核心引擎 |
| 单线程模型 | 协作式多任务,协程主动让出控制权 |
| asyncio.run() | 创建和管理事件循环的标准入口 |
| call_soon/later/at | 调度回调函数执行 |
| run_in_executor | 在线程池中执行阻塞操作 |
| call_soon_threadsafe | 线程安全的回调调度 |
最佳实践:
- 使用
asyncio.run()作为程序入口,不要手动创建事件循环 - 在协程内部使用
get_running_loop()获取事件循环 - 避免在协程中执行阻塞操作,使用
to_thread()或run_in_executor() - 从其他线程操作事件循环时使用
call_soon_threadsafe() - 记得在关闭前取消所有任务并等待清理完成
理解事件循环的工作原理,能帮助你更好地设计异步程序,避免常见的陷阱。