跳到主要内容

事件循环

事件循环(Event Loop)是 asyncio 的核心引擎,它负责调度和执行所有异步任务、处理 I/O 事件、运行回调函数。理解事件循环的工作原理,有助于深入掌握异步编程的本质。

什么是事件循环?

事件循环是一个不断运行的程序结构,它持续监控各种事件源(如网络连接、定时器、文件描述符),当事件发生时,调用相应的处理函数。

可以把事件循环想象成一个"任务调度中心":

┌─────────────────────────────────────────────────────────┐
│ 事件循环 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 任务队列 │ │ I/O 监控 │ │ 定时器 │ │
│ │ │ │ │ │ │ │
│ │ Task 1 │ │ Socket 1 │ │ Timer 1 │ │
│ │ Task 2 │ │ Socket 2 │ │ Timer 2 │ │
│ │ Task 3 │ │ File FD │ │ Timer 3 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 事件分发器 │ │
│ │ (Dispatcher) │ │
│ └───────┬────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 执行回调/协程 │ │
│ └────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘

事件循环的工作流程

事件循环的工作是一个无限循环的过程:

  1. 检查就绪的任务:查看哪些任务可以执行
  2. 执行任务:运行任务直到遇到 await 或完成
  3. 处理 I/O 事件:检查网络、文件等 I/O 是否就绪
  4. 处理定时器:执行到期的定时回调
  5. 重复:回到步骤 1
# 事件循环的简化伪代码
while True:
# 1. 执行就绪的任务
for task in ready_tasks:
run_until_await(task)

# 2. 检查 I/O 事件
io_events = check_io_events()
for event in io_events:
callback = get_callback(event)
callback()

# 3. 处理定时器
expired_timers = get_expired_timers()
for timer in expired_timers:
timer.callback()

# 4. 如果没有任何事情可做,等待
if nothing_to_do():
wait_for_events()

为什么是单线程?

asyncio 使用单线程模型,这与多线程并发有本质区别:

多线程:多个线程同时执行,操作系统负责调度。线程切换有开销,且需要处理线程安全问题。

asyncio 单线程:只有一个线程执行代码,通过协程的协作式切换来实现并发。协程主动让出控制权(await),其他协程才能执行。

单线程模型的优势:

  1. 无竞争条件:不需要锁来保护共享资源(在同一个事件循环内)
  2. 切换开销小:协程切换只是函数调用的开销,远小于线程切换
  3. 可预测:协程切换点明确(await 语句),更容易推理程序行为

但这也意味着:如果一个协程长时间占用 CPU,其他协程就无法执行。所以 asyncio 适合 I/O 密集型任务,不适合 CPU 密集型任务。

获取事件循环

asyncio.run() - 推荐方式

asyncio.run() 是运行异步程序的标准入口(Python 3.7+),它会自动创建和管理事件循环:

import asyncio

async def main():
print("Hello asyncio!")

# 自动创建事件循环、运行协程、关闭事件循环
asyncio.run(main())

asyncio.run() 做了什么:

  1. 创建新的事件循环
  2. 设置为当前线程的事件循环
  3. 运行传入的协程
  4. 关闭事件循环(取消所有任务、清理资源)
  5. 返回协程的结果

注意:同一个程序中只能调用一次 asyncio.run(),不能在已有事件循环中再次调用。

get_running_loop() - 在协程中获取

在协程内部,使用 asyncio.get_running_loop() 获取当前正在运行的事件循环:

import asyncio

async def main():
loop = asyncio.get_running_loop()
print(f"事件循环: {loop}")
print(f"是否运行中: {loop.is_running()}")

asyncio.run(main())

这是最安全的方式,因为它确保一定有正在运行的事件循环。

get_event_loop() - 已弃用的方式

在旧代码中可能看到 asyncio.get_event_loop(),但这个方法在 Python 3.10+ 中已被标记为不推荐:

# 不推荐的方式
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

这种方式容易出错,因为它可能在某些情况下自动创建事件循环,导致混乱。现代代码应该使用 asyncio.run()get_running_loop()

事件循环的核心方法

run_until_complete()

运行直到给定的协程或 Future 完成:

import asyncio

async def work():
await asyncio.sleep(1)
return "完成"

loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(work())
print(result) # "完成"
finally:
loop.close()

run_until_complete() 会阻塞当前线程,直到协程执行完毕。返回协程的结果或抛出异常。

run_forever()

无限运行事件循环,直到调用 stop()

import asyncio

async def periodic():
"""每秒执行一次的周期任务"""
while True:
print("执行中...")
await asyncio.sleep(1)

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# 创建任务
task = loop.create_task(periodic())

try:
loop.run_forever() # 无限运行
except KeyboardInterrupt:
print("用户中断")
finally:
task.cancel() # 取消任务
loop.close()

stop()

停止事件循环:

import asyncio

async def stop_after(delay):
"""延迟后停止事件循环"""
await asyncio.sleep(delay)
loop = asyncio.get_running_loop()
loop.stop()

loop = asyncio.new_event_loop()
loop.create_task(stop_after(3))
loop.run_forever() # 3秒后被 stop() 停止
loop.close()

is_running() 和 is_closed()

检查事件循环状态:

import asyncio

loop = asyncio.new_event_loop()
print(f"运行中: {loop.is_running()}") # False
print(f"已关闭: {loop.is_closed()}") # False

loop.run_until_complete(asyncio.sleep(0.1))
print(f"运行中: {loop.is_running()}") # False(已执行完毕)
print(f"已关闭: {loop.is_closed()}") # False

loop.close()
print(f"已关闭: {loop.is_closed()}") # True

调度回调函数

事件循环可以调度普通函数(非协程)在未来某个时间点执行。

call_soon() - 尽快执行

在事件循环的下一次迭代执行回调:

import asyncio

def callback(name):
print(f"回调执行: {name}")

async def main():
loop = asyncio.get_running_loop()

# 调度三个回调
loop.call_soon(callback, "第一个")
loop.call_soon(callback, "第二个")
loop.call_soon(callback, "第三个")

print("回调已调度")
await asyncio.sleep(0) # 让出控制权,让回调执行
print("协程继续")

asyncio.run(main())

输出:

回调已调度
回调执行: 第一个
回调执行: 第二个
回调执行: 第三个
协程继续

回调按调度顺序执行。

call_later() - 延迟执行

在指定延迟后执行回调:

import asyncio

def callback():
print("延迟回调执行")

async def main():
loop = asyncio.get_running_loop()

# 2 秒后执行
handle = loop.call_later(2, callback)

print("等待回调...")
await asyncio.sleep(3)
print("结束")

asyncio.run(main())

call_later() 返回一个 TimerHandle 对象,可以用来取消回调:

handle = loop.call_later(5, callback)
# ... 如果需要取消
handle.cancel()

call_at() - 在指定时间执行

在事件循环的特定时间点执行回调:

import asyncio

def callback():
print("定时回调执行")

async def main():
loop = asyncio.get_running_loop()

# 获取当前时间
now = loop.time()

# 在 1 秒后的时间点执行
loop.call_at(now + 1, callback)

print(f"当前时间: {now}")
await asyncio.sleep(2)

asyncio.run(main())

loop.time() 返回事件循环的内部单调时钟(与 time.time() 不同),用于 call_at() 的时间参数。

在线程池中执行阻塞操作

事件循环提供了在线程池中执行阻塞函数的能力,避免阻塞主线程。

run_in_executor()

将阻塞函数放到线程池执行:

import asyncio
import time

def blocking_io():
"""模拟阻塞 I/O 操作"""
print("开始阻塞操作")
time.sleep(3) # 这会阻塞线程
print("阻塞操作完成")
return "结果"

async def main():
loop = asyncio.get_running_loop()

print("提交阻塞操作到线程池")

# 在默认线程池中执行
result = await loop.run_in_executor(None, blocking_io)

print(f"得到结果: {result}")

asyncio.run(main())

run_in_executor(None, func) 使用默认的 ThreadPoolExecutor。也可以指定自定义执行器:

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=4)
result = await loop.run_in_executor(executor, blocking_io)

asyncio.to_thread() - 更简洁的方式

Python 3.9+ 提供了更简洁的语法:

import asyncio
import time

def blocking_io(name):
time.sleep(1)
return f"{name} 完成"

async def main():
# 并发执行多个阻塞操作
results = await asyncio.gather(
asyncio.to_thread(blocking_io, "任务1"),
asyncio.to_thread(blocking_io, "任务2"),
asyncio.to_thread(blocking_io, "任务3"),
)
print(results)

asyncio.run(main())

创建任务和 Future

create_task()

在事件循环上创建任务:

import asyncio

async def worker():
await asyncio.sleep(1)
return "工作完成"

async def main():
loop = asyncio.get_running_loop()

# 方式一:通过事件循环创建
task1 = loop.create_task(worker())

# 方式二:通过 asyncio 模块(更常用)
task2 = asyncio.create_task(worker())

results = await asyncio.gather(task1, task2)
print(results)

asyncio.run(main())

create_future()

创建与事件循环关联的 Future 对象:

import asyncio

async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()

# 模拟异步设置结果
async def set_result():
await asyncio.sleep(1)
future.set_result("Future 结果")

asyncio.create_task(set_result())

# 等待 Future
result = await future
print(result)

asyncio.run(main())

多线程与事件循环

虽然 asyncio 是单线程的,但可以在多个线程中运行多个事件循环:

import asyncio
import threading

async def worker(name):
print(f"{name} 在线程 {threading.current_thread().name}")
await asyncio.sleep(1)
return f"{name} 完成"

def run_in_thread(name):
"""在新线程中运行事件循环"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(worker(name))
print(f"结果: {result}")
finally:
loop.close()

# 启动两个线程
t1 = threading.Thread(target=run_in_thread, args=("线程1",))
t2 = threading.Thread(target=run_in_thread, args=("线程2",))

t1.start()
t2.start()
t1.join()
t2.join()

线程安全的回调调度

从其他线程向事件循环提交任务:

import asyncio
import threading

async def main():
loop = asyncio.get_running_loop()

# 在另一个线程中调用
def thread_func():
print("另一个线程尝试调度回调")
# 必须使用 call_soon_threadsafe
loop.call_soon_threadsafe(
lambda: print("回调在事件循环线程执行")
)

t = threading.Thread(target=thread_func)
t.start()

await asyncio.sleep(1)
t.join()

asyncio.run(main())

call_soon_threadsafe() 是唯一线程安全的调度方法,从其他线程操作事件循环时必须使用它。

信号处理

事件循环可以注册 Unix 信号处理器,让程序能够优雅地响应系统信号,例如用户按下 Ctrl+C 或系统发送终止信号。

基本用法

使用 add_signal_handler() 注册信号处理器:

import asyncio
import signal

async def main():
loop = asyncio.get_running_loop()

# 定义信号处理函数
def handle_sigint():
print("\n收到 SIGINT 信号,正在停止...")
# 停止事件循环
loop.stop()

def handle_sigterm():
print("收到 SIGTERM 信号,正在清理...")
loop.stop()

# 注册信号处理器
loop.add_signal_handler(signal.SIGINT, handle_sigint)
loop.add_signal_handler(signal.SIGTERM, handle_sigterm)

print("程序运行中,按 Ctrl+C 停止")

# 模拟长时间运行
try:
while True:
await asyncio.sleep(1)
print("工作中...")
except asyncio.CancelledError:
print("任务被取消")

# 注意:信号处理只在 Unix 系统上有效
# Windows 上需要使用其他方式
asyncio.run(main())

优雅关闭示例

对于需要清理资源的服务器程序,信号处理尤其重要:

import asyncio
import signal

class GracefulServer:
"""支持优雅关闭的服务器"""

def __init__(self):
self.running = True
self.tasks = set()

async def worker(self, name):
"""工作协程"""
try:
while self.running:
print(f"{name} 工作中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"{name} 被取消,正在清理...")
await asyncio.sleep(0.5) # 模拟清理
print(f"{name} 清理完成")

async def run(self):
loop = asyncio.get_running_loop()

# 注册信号处理器
def shutdown():
print("\n开始优雅关闭...")
self.running = False

# 取消所有任务
for task in self.tasks:
task.cancel()

# 只在 Unix 系统上注册
try:
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
except NotImplementedError:
# Windows 不支持 add_signal_handler
print("警告:当前系统不支持信号处理")

# 启动多个工作协程
for i in range(3):
task = asyncio.create_task(self.worker(f"Worker-{i}"))
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)

# 等待所有任务完成
await asyncio.gather(*self.tasks, return_exceptions=True)
print("服务器已关闭")

asyncio.run(GracefulServer().run())

移除信号处理器

使用 remove_signal_handler() 移除已注册的处理器:

import asyncio
import signal

async def main():
loop = asyncio.get_running_loop()

def handler():
print("收到信号")

# 注册处理器
loop.add_signal_handler(signal.SIGUSR1, handler)

# 移除处理器
loop.remove_signal_handler(signal.SIGUSR1)

# 移除处理器后,信号将使用默认行为

asyncio.run(main())

Windows 系统的注意事项

Windows 系统不完全支持 add_signal_handler()。对于跨平台应用,可以使用以下方式:

import asyncio
import signal
import sys

async def main():
loop = asyncio.get_running_loop()
shutdown_event = asyncio.Event()

def shutdown():
print("正在关闭...")
shutdown_event.set()

# Unix 系统:使用 add_signal_handler
if sys.platform != 'win32':
loop.add_signal_handler(signal.SIGINT, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
else:
# Windows:只能在主线程中捕获 KeyboardInterrupt
# 建议使用其他方式,如控制台输入
pass

# 等待关闭信号
await shutdown_event.wait()
print("程序结束")

asyncio.run(main())

常用信号

信号说明典型用途
SIGINT中断信号(Ctrl+C)用户主动中断程序
SIGTERM终止信号系统或进程管理器请求终止
SIGHUP挂起信号通常用于重新加载配置
SIGUSR1/SIGUSR2用户自定义信号自定义功能

注意SIGKILL(信号 9)无法被捕获,程序会立即终止。

事件循环的两种实现

asyncio 在不同操作系统上使用不同的底层实现:

SelectorEventLoop(Unix 默认)

使用 selectpollepollkqueue 系统调用监控 I/O 事件。适用于 Unix 系统。

ProactorEventLoop(Windows 默认)

使用 Windows 的 I/O 完成端口(IOCP)实现。对 Windows 上的高性能网络应用更友好。

import asyncio
import sys

# 查看当前使用的实现
loop = asyncio.new_event_loop()
print(f"事件循环类型: {type(loop).__name__}")
loop.close()

实践示例

示例 1:带超时的任务执行

import asyncio

async def slow_task():
await asyncio.sleep(10)
return "完成"

async def main():
loop = asyncio.get_running_loop()
task = loop.create_task(slow_task())

try:
# 使用 wait_for 添加超时
result = await asyncio.wait_for(task, timeout=2)
except asyncio.TimeoutError:
print("任务超时")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")

asyncio.run(main())

示例 2:定时器实现

import asyncio

class AsyncTimer:
"""异步定时器"""

def __init__(self, interval, callback):
self.interval = interval
self.callback = callback
self._handle = None

def start(self):
loop = asyncio.get_running_loop()
self._handle = loop.call_later(self.interval, self._run)

def _run(self):
self.callback()
# 重新调度
self.start()

def stop(self):
if self._handle:
self._handle.cancel()

async def main():
timer = AsyncTimer(1, lambda: print("滴答"))
timer.start()

await asyncio.sleep(5)
timer.stop()
print("定时器停止")

asyncio.run(main())

示例 3:优雅关闭

import asyncio

async def background_task():
try:
while True:
print("后台任务运行中...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("后台任务清理中...")
await asyncio.sleep(0.5) # 模拟清理
print("后台任务已停止")

async def main():
task = asyncio.create_task(background_task())

# 运行一段时间
await asyncio.sleep(3)

# 取消任务
task.cancel()

# 等待任务处理取消
try:
await task
except asyncio.CancelledError:
pass

print("程序结束")

asyncio.run(main())

小结

事件循环是 asyncio 的核心,它的工作原理可以总结为:

概念说明
事件循环调度和执行异步任务的核心引擎
单线程模型协作式多任务,协程主动让出控制权
asyncio.run()创建和管理事件循环的标准入口
call_soon/later/at调度回调函数执行
run_in_executor在线程池中执行阻塞操作
call_soon_threadsafe线程安全的回调调度

最佳实践:

  1. 使用 asyncio.run() 作为程序入口,不要手动创建事件循环
  2. 在协程内部使用 get_running_loop() 获取事件循环
  3. 避免在协程中执行阻塞操作,使用 to_thread()run_in_executor()
  4. 从其他线程操作事件循环时使用 call_soon_threadsafe()
  5. 记得在关闭前取消所有任务并等待清理完成

理解事件循环的工作原理,能帮助你更好地设计异步程序,避免常见的陷阱。