跳到主要内容

事件机制

Node.js 采用事件驱动架构,事件模块是 Node.js 的核心模块之一。通过 EventEmitter 类,可以实现发布-订阅模式,处理异步事件。

EventEmitter 基础

导入模块

const EventEmitter = require('node:events');

// 创建事件发射器实例
const emitter = new EventEmitter();

基本使用

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 注册事件监听器
emitter.on('message', (data) => {
console.log('收到消息:', data);
});

// 触发事件
emitter.emit('message', 'Hello, World!');
emitter.emit('message', { text: '你好', from: 'Alice' });

事件监听器方法

方法说明
on(event, listener)添加监听器(可重复添加)
once(event, listener)添加一次性监听器(触发后自动移除)
prependListener(event, listener)在监听器数组开头添加监听器
prependOnceListener(event, listener)在开头添加一次性监听器
removeListener(event, listener)移除指定监听器
removeAllListeners([event])移除所有/指定事件的所有监听器
off(event, listener)removeListener 的别名
emit(event, [...args])触发事件
listeners(event)获取事件的所有监听器
listenerCount(event)获取监听器数量

监听器详解

多个监听器

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 添加多个监听器,按添加顺序执行
emitter.on('login', (user) => {
console.log(`用户 ${user} 登录了`);
});

emitter.on('login', (user) => {
console.log(`记录登录日志: ${user}`);
});

emitter.on('login', (user) => {
console.log(`发送登录通知: ${user}`);
});

emitter.emit('login', 'Alice');

// 输出:
// 用户 Alice 登录了
// 记录登录日志: Alice
// 发送登录通知: Alice

解释

  • 所有监听器按添加顺序同步执行
  • emit() 的返回值表示是否有监听器处理了事件
  • 如果事件没有监听器,emit() 返回 false

一次性监听器

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 只触发一次
emitter.once('init', () => {
console.log('初始化完成');
});

emitter.emit('init'); // 输出: 初始化完成
emitter.emit('init'); // 无输出(监听器已被移除)

移除监听器

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

function onMessage(data) {
console.log('消息:', data);
}

// 添加监听器
emitter.on('message', onMessage);

emitter.emit('message', 'test'); // 输出: 消息: test

// 移除监听器
emitter.off('message', onMessage);

emitter.emit('message', 'test2'); // 无输出

// 移除所有监听器
emitter.removeAllListeners('message');
// 或移除所有事件的所有监听器
emitter.removeAllListeners();

获取监听器信息

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

function listener1() { console.log('listener1'); }
function listener2() { console.log('listener2'); }

emitter.on('event', listener1);
emitter.on('event', listener2);

// 获取监听器数量
console.log(emitter.listenerCount('event')); // 2

// 获取监听器数组
console.log(emitter.listeners('event'));
// [ [Function: listener1], [Function: listener2] ]

// 获取所有事件名
emitter.on('another', () => {});
console.log(emitter.eventNames()); // [ 'event', 'another' ]

错误处理

错误事件

当 EventEmitter 发生错误时,会触发 'error' 事件。如果没有监听器,Node.js 会抛出异常并退出:

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 必须注册错误监听器,否则程序会崩溃
emitter.on('error', (err) => {
console.error('发生错误:', err.message);
});

// 触发错误事件
emitter.emit('error', new Error('出问题了'));

使用 errorMonitor 监控错误

events.errorMonitor 是一个特殊的 Symbol,用于监控 'error' 事件而不消费它:

const { EventEmitter, errorMonitor } = require('node:events');

const emitter = new EventEmitter();

// 使用 errorMonitor 监控所有错误(不会阻止错误抛出)
emitter.on(errorMonitor, (err) => {
console.log('监控到错误:', err.message);
// 可以在这里记录日志或发送到监控系统
});

// 仍然需要普通的 error 监听器来处理错误
emitter.on('error', (err) => {
console.log('处理错误:', err.message);
});

emitter.emit('error', new Error('测试错误'));
// 输出:
// 监控到错误: 测试错误
// 处理错误: 测试错误

应用场景

  • 全局错误日志记录
  • 错误监控和告警系统
  • 错误统计和分析

捕获异步监听器中的 Promise 拒绝

当事件监听器是异步函数时,如果内部抛出异常,可能导致未处理的 Promise 拒绝。captureRejections 选项可以自动捕获这些错误:

const { EventEmitter } = require('node:events');

// 方式一:在构造函数中启用
const emitter = new EventEmitter({ captureRejections: true });

// 异步监听器
emitter.on('data', async (data) => {
// 如果这里抛出异常,会被自动捕获
await processData(data);
});

// 错误会被发送到 'error' 事件
emitter.on('error', (err) => {
console.error('捕获到异步错误:', err.message);
});

// 方式二:使用自定义 rejection 处理方法
const emitter2 = new EventEmitter({ captureRejections: true });

emitter2.on('data', async (data) => {
throw new Error('异步处理失败');
});

// 使用 Symbol.for('nodejs.rejection') 定义处理方法
emitter2[Symbol.for('nodejs.rejection')] = (err, eventName, ...args) => {
console.error(`事件 ${eventName} 的异步错误:`, err.message);
};

// 方式三:全局设置
const events = require('node:events');
events.captureRejections = true; // 影响所有新创建的 EventEmitter

重要提示:不要将异步函数用作 'error' 事件的监听器,这可能导致无限错误循环。

自定义事件发射器

继承 EventEmitter

const EventEmitter = require('node:events');

class UserService extends EventEmitter {
constructor() {
super();
this.users = [];
}

addUser(user) {
this.users.push(user);
// 触发事件
this.emit('userAdded', user);
}

removeUser(userId) {
const index = this.users.findIndex(u => u.id === userId);
if (index !== -1) {
const user = this.users.splice(index, 1)[0];
this.emit('userRemoved', user);
}
}
}

// 使用
const userService = new UserService();

// 订阅事件
userService.on('userAdded', (user) => {
console.log(`用户添加: ${user.name}`);
});

userService.on('userRemoved', (user) => {
console.log(`用户删除: ${user.name}`);
});

// 触发事件
userService.addUser({ id: 1, name: 'Alice' });
userService.removeUser(1);

实际应用示例

const EventEmitter = require('node:events');

class DownloadManager extends EventEmitter {
constructor() {
super();
this.downloads = new Map();
}

startDownload(url) {
const id = Date.now().toString();
const download = {
id,
url,
progress: 0,
status: 'pending'
};

this.downloads.set(id, download);
this.emit('downloadStarted', download);

// 模拟下载进度
const interval = setInterval(() => {
download.progress += 10;
this.emit('progress', download);

if (download.progress >= 100) {
clearInterval(interval);
download.status = 'completed';
this.emit('downloadCompleted', download);
}
}, 500);

return id;
}

cancelDownload(id) {
const download = this.downloads.get(id);
if (download) {
download.status = 'cancelled';
this.emit('downloadCancelled', download);
this.downloads.delete(id);
}
}
}

// 使用
const manager = new DownloadManager();

manager.on('downloadStarted', (download) => {
console.log(`开始下载: ${download.url}`);
});

manager.on('progress', (download) => {
console.log(`下载进度: ${download.progress}%`);
});

manager.on('downloadCompleted', (download) => {
console.log(`下载完成: ${download.url}`);
});

manager.startDownload('https://example.com/file.zip');

异步事件处理

异步监听器

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 异步监听器
emitter.on('data', async (data) => {
// 异步操作不会阻塞 emit
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('处理数据:', data);
});

// emit 不会等待异步监听器完成
emitter.emit('data', 'test');
console.log('emit 之后');

// 输出顺序:
// emit 之后
// 处理数据: test

等待异步监听器

const EventEmitter = require('node:events');

class AsyncEmitter extends EventEmitter {
async emitAsync(event, ...args) {
const listeners = this.listeners(event);

for (const listener of listeners) {
await listener(...args);
}
}
}

// 使用
const emitter = new AsyncEmitter();

emitter.on('save', async (data) => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('保存完成:', data);
});

async function main() {
await emitter.emitAsync('save', 'test data');
console.log('所有监听器执行完毕');
}

main();

监听器限制

默认限制

每个事件默认最多 10 个监听器,超过会发出警告:

const EventEmitter = require('node:events');
const emitter = new EventEmitter();

// 设置最大监听器数量
emitter.setMaxListeners(20);

// 获取当前最大监听器数量
console.log(emitter.getMaxListeners()); // 20

// 获取/设置全局默认值
console.log(EventEmitter.defaultMaxListeners); // 10
EventEmitter.defaultMaxListeners = 20;

内存泄漏警告

当监听器数量超过限制时,会发出警告:

(node:12345) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 event listeners added. Use emitter.setMaxListeners() to increase limit

解决方法:

  1. 检查是否真的需要这么多监听器
  2. 确保在不需要时移除监听器
  3. 使用 setMaxListeners() 增加限制

高级特性

events.once() 和 events.on() 静态方法

Node.js 提供了静态方法,用于以 Promise 方式等待事件:

const { EventEmitter, once, on } = require('node:events');

async function main() {
const emitter = new EventEmitter();

// 使用 once 等待单个事件
setTimeout(() => emitter.emit('ready', '数据'), 1000);

const data = await once(emitter, 'ready');
console.log('收到:', data); // ['数据']

// 使用 on 创建异步迭代器
setTimeout(() => emitter.emit('data', 1), 100);
setTimeout(() => emitter.emit('data', 2), 200);
setTimeout(() => emitter.emit('data', 3), 300);
setTimeout(() => emitter.emit('end'), 400);

// 异步迭代事件
const asyncIterator = on(emitter, 'data');

for await (const value of asyncIterator) {
console.log('迭代:', value);
if (value[0] >= 2) break; // 可以随时停止
}
}

main();

rawListeners() 获取原始监听器

rawListeners() 返回监听器数组的副本,包括包装器(如 .once() 创建的):

const { EventEmitter } = require('node:events');

const emitter = new EventEmitter();

emitter.once('event', () => console.log('只执行一次'));

// listeners() 返回包装后的函数
console.log(emitter.listeners('event')[0].name); // onceWrapper

// rawListeners() 返回原始数组
const raw = emitter.rawListeners('event');
console.log(raw[0].listener); // 原始监听器函数

// 可以直接调用原始监听器(不会自动移除)
raw[0].listener(); // 执行
raw[0].listener(); // 再次执行(不会自动移除)

EventEmitter 静态方法

const { EventEmitter, getEventListeners, setMaxListeners } = require('node:events');

const emitter = new EventEmitter();
emitter.on('event', () => {});
emitter.on('event', () => {});

// 获取监听器数组
const listeners = getEventListeners(emitter, 'event');
console.log(listeners.length); // 2

// 获取最大监听器数量
const max = EventEmitter.getMaxListeners(emitter);
console.log(max); // 10

// 设置最大监听器数量
setMaxListeners(20, emitter);

AbortController 与事件结合

Node.js 18+ 支持 events.addAbortListener,可以方便地处理可中止的操作:

const { EventEmitter, addAbortListener } = require('node:events');

const controller = new AbortController();
const emitter = new EventEmitter();

// 监听 abort 事件
const disposable = addAbortListener(controller.signal, () => {
console.log('操作被中止');
emitter.emit('cancelled');
});

// 取消操作
controller.abort();

// 清理监听器
disposable[Symbol.dispose]();

EventTarget 和 Event API

Node.js 提供了 EventTargetEvent API,这是 Web 标准的事件接口,与浏览器的 DOM 事件类似。

EventTarget 基本用法

const { EventTarget, Event } = require('node:events');

const target = new EventTarget();

// 添加监听器
target.addEventListener('message', (event) => {
console.log('收到消息:', event.detail);
});

// 触发事件
const event = new Event('message');
event.detail = { text: 'Hello' };
target.dispatchEvent(event);

CustomEvent 自定义事件

const { EventTarget, CustomEvent } = require('node:events');

const target = new EventTarget();

target.addEventListener('custom', (event) => {
console.log('自定义事件:', event.detail);
});

// CustomEvent 可以直接传递数据
const customEvent = new CustomEvent('custom', {
detail: { name: 'Alice', age: 25 }
});

target.dispatchEvent(customEvent);

NodeEventTarget 混合类型

NodeEventTarget 是 Node.js 扩展的 EventTarget,同时支持 EventEmitter 风格的 API:

const { NodeEventTarget } = require('node:events');

const target = new NodeEventTarget();

// EventEmitter 风格
target.on('event', (data) => console.log('EE 风格:', data));
target.emit('event', '数据');

// EventTarget 风格
target.addEventListener('event', (e) => console.log('ET 风格:', e));
target.dispatchEvent(new Event('event'));

EventEmitter 与 EventTarget 对比

// EventEmitter 示例
const EventEmitter = require('node:events');
const emitter = new EventEmitter();

emitter.on('data', (a, b, c) => {
console.log('EventEmitter:', a, b, c);
});

emitter.emit('data', 1, 2, 3);
// 输出: EventEmitter: 1 2 3

// EventTarget 示例
const { EventTarget, Event } = require('node:events');
const target = new EventTarget();

target.addEventListener('data', (event) => {
console.log('EventTarget:', event.detail);
});

const event = new Event('data');
event.detail = [1, 2, 3];
target.dispatchEvent(event);
// 输出: EventTarget: [1, 2, 3]

主要差异

特性EventEmitterEventTarget
监听器类型函数函数或对象
事件对象无(直接传参)Event 对象
错误处理必须监听 error不特殊处理 error
监听器数量限制有(默认 10)无限制
返回值链式调用无返回值
适用场景Node.js 内部跨平台代码

事件驱动架构最佳实践

错误处理模式

使用异步监听器时,需要特别注意错误处理:

const { EventEmitter } = require('node:events');

class AsyncEventEmitter extends EventEmitter {
constructor() {
super({ captureRejections: true });
}

async emitAsync(event, ...args) {
const listeners = this.listeners(event);
const results = [];

for (const listener of listeners) {
try {
const result = await listener(...args);
results.push(result);
} catch (error) {
this.emit('error', error);
}
}

return results;
}
}

// 使用示例
const emitter = new AsyncEventEmitter();

emitter.on('process', async (data) => {
await processData(data);
});

emitter.on('error', (err) => {
console.error('处理失败:', err);
});

await emitter.emitAsync('process', { id: 1 });

事件驱动中间件模式

const { EventEmitter } = require('node:events');

class Application extends EventEmitter {
constructor() {
super({ captureRejections: true });
this.middleware = [];
}

use(fn) {
this.middleware.push(fn);
return this;
}

async handleRequest(req, res) {
const ctx = { req, res, app: this };

try {
this.emit('request:start', ctx);

for (const fn of this.middleware) {
await fn(ctx);
}

this.emit('request:end', ctx);
} catch (error) {
this.emit('request:error', { ctx, error });
}
}
}

// 使用示例
const app = new Application();

app.use(async (ctx) => {
console.log('中间件 1');
ctx.body = 'Hello';
});

app.use(async (ctx) => {
console.log('中间件 2');
ctx.body += ' World';
});

app.on('request:start', (ctx) => {
console.log(`请求开始: ${ctx.req.url}`);
});

app.on('request:error', ({ ctx, error }) => {
console.error('请求错误:', error);
});

小结

本章我们学习了:

  1. EventEmitter 基础:创建、注册监听器、触发事件
  2. 监听器管理:添加、移除、一次性监听器
  3. 错误处理:处理 error 事件、errorMonitor、captureRejections
  4. 自定义事件发射器:继承 EventEmitter 创建事件驱动类
  5. 异步事件:处理异步监听器、最佳实践
  6. 监听器限制:避免内存泄漏
  7. 高级特性:once/on 静态方法、rawListeners、EventTarget API
  8. EventTarget API:Web 标准事件接口

练习

  1. 创建一个计时器类,支持 start、pause、stop 事件
  2. 实现一个简单的事件总线,支持跨模块通信
  3. 创建一个任务队列,使用事件通知任务状态变化
  4. 实现一个文件监听器,当文件变化时发出事件
  5. 使用 events.errorMonitor 实现错误监控系统
  6. 创建一个支持异步监听器的 EventEmitter,使用 captureRejections 捕获错误
  7. 实现 EventTarget 版本的简单事件系统,对比与 EventEmitter 的差异
  8. 使用 events.on() 实现一个异步事件流处理器