事件机制
Node.js 采用事件驱动架构,事件模块是 Node.js 的核心模块之一。通过 EventEmitter 类,可以实现发布-订阅模式,处理异步事件。
EventEmitter 基础
导入模块
const EventEmitter = require('node:events');
// 创建事件发射器实例
const emitter = new EventEmitter();
基本使用
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 注册事件监听器
emitter.on('message', (data) => {
console.log('收到消息:', data);
});
// 触发事件
emitter.emit('message', 'Hello, World!');
emitter.emit('message', { text: '你好', from: 'Alice' });
事件监听器方法
| 方法 | 说明 |
|---|---|
on(event, listener) | 添加监听器(可重复添加) |
once(event, listener) | 添加一次性监听器(触发后自动移除) |
prependListener(event, listener) | 在监听器数组开头添加监听器 |
prependOnceListener(event, listener) | 在开头添加一次性监听器 |
removeListener(event, listener) | 移除指定监听器 |
removeAllListeners([event]) | 移除所有/指定事件的所有监听器 |
off(event, listener) | removeListener 的别名 |
emit(event, [...args]) | 触发事件 |
listeners(event) | 获取事件的所有监听器 |
listenerCount(event) | 获取监听器数量 |
监听器详解
多个监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 添加多个监听器,按添加顺序执行
emitter.on('login', (user) => {
console.log(`用户 ${user} 登录了`);
});
emitter.on('login', (user) => {
console.log(`记录登录日志: ${user}`);
});
emitter.on('login', (user) => {
console.log(`发送登录通知: ${user}`);
});
emitter.emit('login', 'Alice');
// 输出:
// 用户 Alice 登录了
// 记录登录日志: Alice
// 发送登录通知: Alice
解释:
- 所有监听器按添加顺序同步执行
emit()的返回值表示是否有监听器处理了事件- 如果事件没有监听器,
emit()返回false
一次性监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 只触发一次
emitter.once('init', () => {
console.log('初始化完成');
});
emitter.emit('init'); // 输出: 初始化完成
emitter.emit('init'); // 无输出(监听器已被移除)
移除监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
function onMessage(data) {
console.log('消息:', data);
}
// 添加监听器
emitter.on('message', onMessage);
emitter.emit('message', 'test'); // 输出: 消息: test
// 移除监听器
emitter.off('message', onMessage);
emitter.emit('message', 'test2'); // 无输出
// 移除所有监听器
emitter.removeAllListeners('message');
// 或移除所有事件的所有监听器
emitter.removeAllListeners();
获取监听器信息
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
function listener1() { console.log('listener1'); }
function listener2() { console.log('listener2'); }
emitter.on('event', listener1);
emitter.on('event', listener2);
// 获取监听器数量
console.log(emitter.listenerCount('event')); // 2
// 获取监听器数组
console.log(emitter.listeners('event'));
// [ [Function: listener1], [Function: listener2] ]
// 获取所有事件名
emitter.on('another', () => {});
console.log(emitter.eventNames()); // [ 'event', 'another' ]
错误处理
错误事件
当 EventEmitter 发生错误时,会触发 'error' 事件。如果没有监听器,Node.js 会抛出异常并退出:
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 必须注册错误监听器,否则程序会崩溃
emitter.on('error', (err) => {
console.error('发生错误:', err.message);
});
// 触发错误事件
emitter.emit('error', new Error('出问题了'));
使用 errorMonitor 监控错误
events.errorMonitor 是一个特殊的 Symbol,用于监控 'error' 事件而不消费它:
const { EventEmitter, errorMonitor } = require('node:events');
const emitter = new EventEmitter();
// 使用 errorMonitor 监控所有错误(不会阻止错误抛出)
emitter.on(errorMonitor, (err) => {
console.log('监控到错误:', err.message);
// 可以在这里记录日志或发送到监控系统
});
// 仍然需要普通的 error 监听器来处理错误
emitter.on('error', (err) => {
console.log('处理错误:', err.message);
});
emitter.emit('error', new Error('测试错误'));
// 输出:
// 监控到错误: 测试错误
// 处理错误: 测试错误
应用场景:
- 全局错误日志记录
- 错误监控和告警系统
- 错误统计和分析
捕获异步监听器中的 Promise 拒绝
当事件监听器是异步函数时,如果内部抛出异常,可能导致未处理的 Promise 拒绝。captureRejections 选项可以自动捕获这些错误:
const { EventEmitter } = require('node:events');
// 方式一:在构造函数中启用
const emitter = new EventEmitter({ captureRejections: true });
// 异步监听器
emitter.on('data', async (data) => {
// 如果这里抛出异常,会被自动捕获
await processData(data);
});
// 错误会被发送到 'error' 事件
emitter.on('error', (err) => {
console.error('捕获到异步错误:', err.message);
});
// 方式二:使用自定义 rejection 处理方法
const emitter2 = new EventEmitter({ captureRejections: true });
emitter2.on('data', async (data) => {
throw new Error('异步处理失败');
});
// 使用 Symbol.for('nodejs.rejection') 定义处理方法
emitter2[Symbol.for('nodejs.rejection')] = (err, eventName, ...args) => {
console.error(`事件 ${eventName} 的异步错误:`, err.message);
};
// 方式三:全局设置
const events = require('node:events');
events.captureRejections = true; // 影响所有新创建的 EventEmitter
重要提示:不要将异步函数用作 'error' 事件的监听器,这可能导致无限错误循环。
自定义事件发射器
继承 EventEmitter
const EventEmitter = require('node:events');
class UserService extends EventEmitter {
constructor() {
super();
this.users = [];
}
addUser(user) {
this.users.push(user);
// 触发事件
this.emit('userAdded', user);
}
removeUser(userId) {
const index = this.users.findIndex(u => u.id === userId);
if (index !== -1) {
const user = this.users.splice(index, 1)[0];
this.emit('userRemoved', user);
}
}
}
// 使用
const userService = new UserService();
// 订阅事件
userService.on('userAdded', (user) => {
console.log(`用户添加: ${user.name}`);
});
userService.on('userRemoved', (user) => {
console.log(`用户删除: ${user.name}`);
});
// 触发事件
userService.addUser({ id: 1, name: 'Alice' });
userService.removeUser(1);
实际应用示例
const EventEmitter = require('node:events');
class DownloadManager extends EventEmitter {
constructor() {
super();
this.downloads = new Map();
}
startDownload(url) {
const id = Date.now().toString();
const download = {
id,
url,
progress: 0,
status: 'pending'
};
this.downloads.set(id, download);
this.emit('downloadStarted', download);
// 模拟下载进度
const interval = setInterval(() => {
download.progress += 10;
this.emit('progress', download);
if (download.progress >= 100) {
clearInterval(interval);
download.status = 'completed';
this.emit('downloadCompleted', download);
}
}, 500);
return id;
}
cancelDownload(id) {
const download = this.downloads.get(id);
if (download) {
download.status = 'cancelled';
this.emit('downloadCancelled', download);
this.downloads.delete(id);
}
}
}
// 使用
const manager = new DownloadManager();
manager.on('downloadStarted', (download) => {
console.log(`开始下载: ${download.url}`);
});
manager.on('progress', (download) => {
console.log(`下载进度: ${download.progress}%`);
});
manager.on('downloadCompleted', (download) => {
console.log(`下载完成: ${download.url}`);
});
manager.startDownload('https://example.com/file.zip');
异步事件处理
异步监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 异步监听器
emitter.on('data', async (data) => {
// 异步操作不会阻塞 emit
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('处理数据:', data);
});
// emit 不会等待异步监听器完成
emitter.emit('data', 'test');
console.log('emit 之后');
// 输出顺序:
// emit 之后
// 处理数据: test
等待异步监听器
const EventEmitter = require('node:events');
class AsyncEmitter extends EventEmitter {
async emitAsync(event, ...args) {
const listeners = this.listeners(event);
for (const listener of listeners) {
await listener(...args);
}
}
}
// 使用
const emitter = new AsyncEmitter();
emitter.on('save', async (data) => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('保存完成:', data);
});
async function main() {
await emitter.emitAsync('save', 'test data');
console.log('所有监听器执行完毕');
}
main();
监听器限制
默认限制
每个事件默认最多 10 个监听器,超过会发出警告:
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 设置最大监听器数量
emitter.setMaxListeners(20);
// 获取当前最大监听器数量
console.log(emitter.getMaxListeners()); // 20
// 获取/设置全局默认值
console.log(EventEmitter.defaultMaxListeners); // 10
EventEmitter.defaultMaxListeners = 20;
内存泄漏警告
当监听器数量超过限制时,会发出警告:
(node:12345) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 event listeners added. Use emitter.setMaxListeners() to increase limit
解决方法:
- 检查是否真的需要这么多监听器
- 确保在不需要时移除监听器
- 使用
setMaxListeners()增加限制
高级特性
events.once() 和 events.on() 静态方法
Node.js 提供了静态方法,用于以 Promise 方式等待事件:
const { EventEmitter, once, on } = require('node:events');
async function main() {
const emitter = new EventEmitter();
// 使用 once 等待单个事件
setTimeout(() => emitter.emit('ready', '数据'), 1000);
const data = await once(emitter, 'ready');
console.log('收到:', data); // ['数据']
// 使用 on 创建异步迭代器
setTimeout(() => emitter.emit('data', 1), 100);
setTimeout(() => emitter.emit('data', 2), 200);
setTimeout(() => emitter.emit('data', 3), 300);
setTimeout(() => emitter.emit('end'), 400);
// 异步迭代事件
const asyncIterator = on(emitter, 'data');
for await (const value of asyncIterator) {
console.log('迭代:', value);
if (value[0] >= 2) break; // 可以随时停止
}
}
main();
rawListeners() 获取原始监听器
rawListeners() 返回监听器数组的副本,包括包装器(如 .once() 创建的):
const { EventEmitter } = require('node:events');
const emitter = new EventEmitter();
emitter.once('event', () => console.log('只执行一次'));
// listeners() 返回包装后的函数
console.log(emitter.listeners('event')[0].name); // onceWrapper
// rawListeners() 返回原始数组
const raw = emitter.rawListeners('event');
console.log(raw[0].listener); // 原始监听器函数
// 可以直接调用原始监听器(不会自动移除)
raw[0].listener(); // 执行
raw[0].listener(); // 再次执行(不会自动移除)
EventEmitter 静态方法
const { EventEmitter, getEventListeners, setMaxListeners } = require('node:events');
const emitter = new EventEmitter();
emitter.on('event', () => {});
emitter.on('event', () => {});
// 获取监听器数组
const listeners = getEventListeners(emitter, 'event');
console.log(listeners.length); // 2
// 获取最大监听器数量
const max = EventEmitter.getMaxListeners(emitter);
console.log(max); // 10
// 设置最大监听器数量
setMaxListeners(20, emitter);
AbortController 与事件结合
Node.js 18+ 支持 events.addAbortListener,可以方便地处理可中止的操作:
const { EventEmitter, addAbortListener } = require('node:events');
const controller = new AbortController();
const emitter = new EventEmitter();
// 监听 abort 事件
const disposable = addAbortListener(controller.signal, () => {
console.log('操作被中止');
emitter.emit('cancelled');
});
// 取消操作
controller.abort();
// 清理监听器
disposable[Symbol.dispose]();
EventTarget 和 Event API
Node.js 提供了 EventTarget 和 Event API,这是 Web 标准的事件接口,与浏览器的 DOM 事件类似。
EventTarget 基本用法
const { EventTarget, Event } = require('node:events');
const target = new EventTarget();
// 添加监听器
target.addEventListener('message', (event) => {
console.log('收到消息:', event.detail);
});
// 触发事件
const event = new Event('message');
event.detail = { text: 'Hello' };
target.dispatchEvent(event);
CustomEvent 自定义事件
const { EventTarget, CustomEvent } = require('node:events');
const target = new EventTarget();
target.addEventListener('custom', (event) => {
console.log('自定义事件:', event.detail);
});
// CustomEvent 可以直接传递数据
const customEvent = new CustomEvent('custom', {
detail: { name: 'Alice', age: 25 }
});
target.dispatchEvent(customEvent);
NodeEventTarget 混合类型
NodeEventTarget 是 Node.js 扩展的 EventTarget,同时支持 EventEmitter 风格的 API:
const { NodeEventTarget } = require('node:events');
const target = new NodeEventTarget();
// EventEmitter 风格
target.on('event', (data) => console.log('EE 风格:', data));
target.emit('event', '数据');
// EventTarget 风格
target.addEventListener('event', (e) => console.log('ET 风格:', e));
target.dispatchEvent(new Event('event'));
EventEmitter 与 EventTarget 对比
// EventEmitter 示例
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
emitter.on('data', (a, b, c) => {
console.log('EventEmitter:', a, b, c);
});
emitter.emit('data', 1, 2, 3);
// 输出: EventEmitter: 1 2 3
// EventTarget 示例
const { EventTarget, Event } = require('node:events');
const target = new EventTarget();
target.addEventListener('data', (event) => {
console.log('EventTarget:', event.detail);
});
const event = new Event('data');
event.detail = [1, 2, 3];
target.dispatchEvent(event);
// 输出: EventTarget: [1, 2, 3]
主要差异:
| 特性 | EventEmitter | EventTarget |
|---|---|---|
| 监听器类型 | 函数 | 函数或对象 |
| 事件对象 | 无(直接传参) | Event 对象 |
| 错误处理 | 必须监听 error | 不特殊处理 error |
| 监听器数量限制 | 有(默认 10) | 无限制 |
| 返回值 | 链式调用 | 无返回值 |
| 适用场景 | Node.js 内部 | 跨平台代码 |
事件驱动架构最佳实践
错误处理模式
使用异步监听器时,需要特别注意错误处理:
const { EventEmitter } = require('node:events');
class AsyncEventEmitter extends EventEmitter {
constructor() {
super({ captureRejections: true });
}
async emitAsync(event, ...args) {
const listeners = this.listeners(event);
const results = [];
for (const listener of listeners) {
try {
const result = await listener(...args);
results.push(result);
} catch (error) {
this.emit('error', error);
}
}
return results;
}
}
// 使用示例
const emitter = new AsyncEventEmitter();
emitter.on('process', async (data) => {
await processData(data);
});
emitter.on('error', (err) => {
console.error('处理失败:', err);
});
await emitter.emitAsync('process', { id: 1 });
事件驱动中间件模式
const { EventEmitter } = require('node:events');
class Application extends EventEmitter {
constructor() {
super({ captureRejections: true });
this.middleware = [];
}
use(fn) {
this.middleware.push(fn);
return this;
}
async handleRequest(req, res) {
const ctx = { req, res, app: this };
try {
this.emit('request:start', ctx);
for (const fn of this.middleware) {
await fn(ctx);
}
this.emit('request:end', ctx);
} catch (error) {
this.emit('request:error', { ctx, error });
}
}
}
// 使用示例
const app = new Application();
app.use(async (ctx) => {
console.log('中间件 1');
ctx.body = 'Hello';
});
app.use(async (ctx) => {
console.log('中间件 2');
ctx.body += ' World';
});
app.on('request:start', (ctx) => {
console.log(`请求开始: ${ctx.req.url}`);
});
app.on('request:error', ({ ctx, error }) => {
console.error('请求错误:', error);
});
小结
本章我们学习了:
- EventEmitter 基础:创建、注册监听器、触发事件
- 监听器管理:添加、移除、一次性监听器
- 错误处理:处理 error 事件、errorMonitor、captureRejections
- 自定义事件发射器:继承 EventEmitter 创建事件驱动类
- 异步事件:处理异步监听器、最佳实践
- 监听器限制:避免内存泄漏
- 高级特性:once/on 静态方法、rawListeners、EventTarget API
- EventTarget API:Web 标准事件接口
练习
- 创建一个计时器类,支持 start、pause、stop 事件
- 实现一个简单的事件总线,支持跨模块通信
- 创建一个任务队列,使用事件通知任务状态变化
- 实现一个文件监听器,当文件变化时发出事件
- 使用 events.errorMonitor 实现错误监控系统
- 创建一个支持异步监听器的 EventEmitter,使用 captureRejections 捕获错误
- 实现 EventTarget 版本的简单事件系统,对比与 EventEmitter 的差异
- 使用 events.on() 实现一个异步事件流处理器