事件机制
Node.js 采用事件驱动架构,事件模块是 Node.js 的核心模块之一。通过 EventEmitter 类,可以实现发布-订阅模式,处理异步事件。
EventEmitter 基础
导入模块
const EventEmitter = require('node:events');
// 创建事件发射器实例
const emitter = new EventEmitter();
基本使用
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 注册事件监听器
emitter.on('message', (data) => {
console.log('收到消息:', data);
});
// 触发事件
emitter.emit('message', 'Hello, World!');
emitter.emit('message', { text: '你好', from: 'Alice' });
事件监听器方法
| 方法 | 说明 |
|---|---|
on(event, listener) | 添加监听器(可重复添加) |
once(event, listener) | 添加一次性监听器(触发后自动移除) |
prependListener(event, listener) | 在监听器数组开头添加监听器 |
prependOnceListener(event, listener) | 在开头添加一次性监听器 |
removeListener(event, listener) | 移除指定监听器 |
removeAllListeners([event]) | 移除所有/指定事件的所有监听器 |
off(event, listener) | removeListener 的别名 |
emit(event, [...args]) | 触发事件 |
listeners(event) | 获取事件的所有监听器 |
listenerCount(event) | 获取监听器数量 |
监听器详解
多个监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 添加多个监听器,按添加顺序执行
emitter.on('login', (user) => {
console.log(`用户 ${user} 登录了`);
});
emitter.on('login', (user) => {
console.log(`记录登录日志: ${user}`);
});
emitter.on('login', (user) => {
console.log(`发送登录通知: ${user}`);
});
emitter.emit('login', 'Alice');
// 输出:
// 用户 Alice 登录了
// 记录登录日志: Alice
// 发送登录通知: Alice
一次性监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 只触发一次
emitter.once('init', () => {
console.log('初始化完成');
});
emitter.emit('init'); // 输出: 初始化完成
emitter.emit('init'); // 无输出(监听器已被移除)
移除监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
function onMessage(data) {
console.log('消息:', data);
}
// 添加监听器
emitter.on('message', onMessage);
emitter.emit('message', 'test'); // 输出: 消息: test
// 移除监听器
emitter.off('message', onMessage);
emitter.emit('message', 'test2'); // 无输出
// 移除所有监听器
emitter.removeAllListeners('message');
// 或移除所有事件的所有监听器
emitter.removeAllListeners();
获取监听器信息
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
function listener1() { console.log('listener1'); }
function listener2() { console.log('listener2'); }
emitter.on('event', listener1);
emitter.on('event', listener2);
// 获取监听器数量
console.log(emitter.listenerCount('event')); // 2
// 获取监听器数组
console.log(emitter.listeners('event'));
// [ [Function: listener1], [Function: listener2] ]
// 获取所有事件名
emitter.on('another', () => {});
console.log(emitter.eventNames()); // [ 'event', 'another' ]
错误处理
错误事件
当 EventEmitter 发生错误时,会触发 'error' 事件。如果没有监听器,Node.js 会抛出异常并退出:
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 必须注册错误监听器,否则程序会崩溃
emitter.on('error', (err) => {
console.error('发生错误:', err.message);
});
// 触发错误事件
emitter.emit('error', new Error('出问题了'));
全局错误处理
const EventEmitter = require('node:events');
// 设置全局错误处理器
EventEmitter.defaultMaxListeners = 20;
// 或者在进程级别捕获未处理的错误
process.on('uncaughtException', (err) => {
console.error('未捕获的异常:', err);
});
自定义事件发射器
继承 EventEmitter
const EventEmitter = require('node:events');
class UserService extends EventEmitter {
constructor() {
super();
this.users = [];
}
addUser(user) {
this.users.push(user);
// 触发事件
this.emit('userAdded', user);
}
removeUser(userId) {
const index = this.users.findIndex(u => u.id === userId);
if (index !== -1) {
const user = this.users.splice(index, 1)[0];
this.emit('userRemoved', user);
}
}
}
// 使用
const userService = new UserService();
// 订阅事件
userService.on('userAdded', (user) => {
console.log(`用户添加: ${user.name}`);
});
userService.on('userRemoved', (user) => {
console.log(`用户删除: ${user.name}`);
});
// 触发事件
userService.addUser({ id: 1, name: 'Alice' });
userService.removeUser(1);
实际应用示例
const EventEmitter = require('node:events');
class DownloadManager extends EventEmitter {
constructor() {
super();
this.downloads = new Map();
}
startDownload(url) {
const id = Date.now().toString();
const download = {
id,
url,
progress: 0,
status: 'pending'
};
this.downloads.set(id, download);
this.emit('downloadStarted', download);
// 模拟下载进度
const interval = setInterval(() => {
download.progress += 10;
this.emit('progress', download);
if (download.progress >= 100) {
clearInterval(interval);
download.status = 'completed';
this.emit('downloadCompleted', download);
}
}, 500);
return id;
}
cancelDownload(id) {
const download = this.downloads.get(id);
if (download) {
download.status = 'cancelled';
this.emit('downloadCancelled', download);
this.downloads.delete(id);
}
}
}
// 使用
const manager = new DownloadManager();
manager.on('downloadStarted', (download) => {
console.log(`开始下载: ${download.url}`);
});
manager.on('progress', (download) => {
console.log(`下载进度: ${download.progress}%`);
});
manager.on('downloadCompleted', (download) => {
console.log(`下载完成: ${download.url}`);
});
manager.startDownload('https://example.com/file.zip');
异步事件处理
异步监听器
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 异步监听器
emitter.on('data', async (data) => {
// 异步操作不会阻塞 emit
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('处理数据:', data);
});
// emit 不会等待异步监听器完成
emitter.emit('data', 'test');
console.log('emit 之后');
// 输出顺序:
// emit 之后
// 处理数据: test
等待异步监听器
const EventEmitter = require('node:events');
class AsyncEmitter extends EventEmitter {
async emitAsync(event, ...args) {
const listeners = this.listeners(event);
for (const listener of listeners) {
await listener(...args);
}
}
}
// 使用
const emitter = new AsyncEmitter();
emitter.on('save', async (data) => {
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('保存完成:', data);
});
async function main() {
await emitter.emitAsync('save', 'test data');
console.log('所有监听器执行完毕');
}
main();
监听器限制
默认限制
每个事件默认最多 10 个监听器,超过会发出警告:
const EventEmitter = require('node:events');
const emitter = new EventEmitter();
// 设置最大监听器数量
emitter.setMaxListeners(20);
// 获取当前最大监听器数量
console.log(emitter.getMaxListeners()); // 20
// 获取/设置全局默认值
console.log(EventEmitter.defaultMaxListeners); // 10
EventEmitter.defaultMaxListeners = 20;
内存泄漏警告
当监听器数量超过限制时,会发出警告:
(node:12345) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 event listeners added. Use emitter.setMaxListeners() to increase limit
解决方法:
- 检查是否真的需要这么多监听器
- 确保在不需要时移除监听器
- 使用
setMaxListeners()增加限制
事件捕获与冒泡
EventEmitter 不支持 DOM 那样的捕获和冒泡,但可以实现类似机制:
const EventEmitter = require('node:events');
class HierarchicalEmitter extends EventEmitter {
constructor(parent = null) {
super();
this.parent = parent;
}
emit(event, ...args) {
// 先执行当前层级的监听器
let result = super.emit(event, ...args);
// 然后冒泡到父级
if (this.parent && !event.startsWith('!')) {
this.parent.emit(event, ...args);
}
return result;
}
}
// 使用
const parent = new HierarchicalEmitter();
const child = new HierarchicalEmitter(parent);
parent.on('event', () => console.log('父级收到事件'));
child.on('event', () => console.log('子级收到事件'));
child.emit('event');
// 输出:
// 子级收到事件
// 父级收到事件
小结
本章我们学习了:
- EventEmitter 基础:创建、注册监听器、触发事件
- 监听器管理:添加、移除、一次性监听器
- 错误处理:处理 error 事件
- 自定义事件发射器:继承 EventEmitter 创建事件驱动类
- 异步事件:处理异步监听器
- 监听器限制:避免内存泄漏
练习
- 创建一个计时器类,支持 start、pause、stop 事件
- 实现一个简单的事件总线,支持跨模块通信
- 创建一个任务队列,使用事件通知任务状态变化
- 实现一个文件监听器,当文件变化时发出事件