WebSocket 实时通信
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务端主动向客户端推送数据。这使得实时应用(如聊天室、通知系统、在线协作工具)的开发变得简单高效。
WebSocket 基础概念
HTTP 与 WebSocket 的区别
传统的 HTTP 通信是请求-响应模式:客户端发送请求,服务端返回响应。如果服务端有新数据需要推送,客户端必须不断轮询(polling),这既浪费资源又增加了延迟。
WebSocket 建立了一种持久的双向通道,客户端和服务端可以随时互相发送消息,无需重复建立连接。
HTTP 模式(轮询):
客户端 ----请求----> 服务端
客户端 <---响应----- 服务端
客户端 ----请求----> 服务端 (不断轮询)
客户端 <---响应----- 服务端
WebSocket 模式:
客户端 <==双向通道==> 服务端
(随时互发消息)
WebSocket 连接过程
WebSocket 连接通过 HTTP 协议升级建立:
- 客户端发送 HTTP 请求,带有
Upgrade: websocket头 - 服务端返回
101 Switching Protocols状态码 - 连接升级为 WebSocket,开始双向通信
客户端请求:
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Socket.IO:功能丰富的实时通信库
Socket.IO 是最流行的 Node.js 实时通信库,它在 WebSocket 之上提供了更多功能。
Socket.IO 与原生 WebSocket 的区别
Socket.IO 并非纯粹的 WebSocket 实现,它提供了以下额外功能:
| 特性 | 原生 WebSocket | Socket.IO |
|---|---|---|
| 自动重连 | 需手动实现 | 内置支持 |
| 断线重连 | 无 | 指数退避重连 |
| 心跳检测 | 需手动实现 | 内置支持 |
| 消息缓冲 | 无 | 断线时自动缓冲 |
| 房间/命名空间 | 无 | 原生支持 |
| 广播 | 需手动实现 | 内置支持 |
| 确认机制 | 无 | 内置支持 |
| 降级兼容 | 无 | HTTP 长轮询降级 |
安装与基本使用
npm install socket.io
服务端配置
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
// 创建 Socket.IO 服务器
const io = new Server(httpServer, {
// 配置选项
cors: {
origin: 'http://localhost:3000', // 允许的前端域名
methods: ['GET', 'POST']
},
// 传输方式(默认同时支持 WebSocket 和 HTTP 长轮询)
transports: ['websocket', 'polling'],
// 心跳配置
pingInterval: 25000, // 心跳间隔(毫秒)
pingTimeout: 20000 // 心跳超时(毫秒)
});
// 监听连接事件
io.on('connection', (socket) => {
console.log(`客户端已连接: ${socket.id}`);
// 监听断开连接
socket.on('disconnect', (reason) => {
console.log(`客户端已断开: ${reason}`);
});
});
// 启动服务器
httpServer.listen(3000, () => {
console.log('服务器运行在 http://localhost:3000');
});
客户端配置
<!-- 在 HTML 中引入客户端库 -->
<script src="/socket.io/socket.io.js"></script>
<script>
// 建立连接
const socket = io('http://localhost:3000', {
transports: ['websocket', 'polling']
});
// 连接成功
socket.on('connect', () => {
console.log('已连接到服务器:', socket.id);
});
// 连接错误
socket.on('connect_error', (error) => {
console.error('连接错误:', error);
});
// 断开连接
socket.on('disconnect', (reason) => {
console.log('已断开连接:', reason);
});
</script>
或者使用 npm 安装客户端:
npm install socket.io-client
// ES Module 语法
import { io } from 'socket.io-client';
// CommonJS 语法
const { io } = require('socket.io-client');
const socket = io('http://localhost:3000');
事件通信
Socket.IO 使用事件驱动模型,客户端和服务端通过 emit 发送事件、on 监听事件。
发送和接收消息
// 服务端
io.on('connection', (socket) => {
// 接收客户端消息
socket.on('chat message', (msg) => {
console.log('收到消息:', msg);
// 发送消息给当前客户端
socket.emit('chat message', `服务端收到: ${msg}`);
// 广播给所有客户端(除了发送者)
socket.broadcast.emit('chat message', msg);
// 广播给所有客户端(包括发送者)
io.emit('chat message', msg);
});
});
// 客户端
const socket = io();
// 发送消息
socket.emit('chat message', '你好,服务器!');
// 接收消息
socket.on('chat message', (msg) => {
console.log('收到消息:', msg);
});
确认机制(Acknowledge)
Socket.IO 支持请求-响应模式,发送方可以等待接收方的确认:
// 服务端
socket.on('get data', (callback) => {
// 执行某些操作
const data = { name: 'Express', version: '5.0' };
// 通过 callback 返回结果
callback({ status: 'success', data });
});
// 或者异步操作
socket.on('async operation', async (callback) => {
try {
const result = await someAsyncTask();
callback({ success: true, result });
} catch (error) {
callback({ success: false, error: error.message });
}
});
// 客户端
socket.emit('get data', (response) => {
console.log('服务端响应:', response);
// { status: 'success', data: { name: 'Express', version: '5.0' } }
});
// 带超时的确认
socket.timeout(5000).emit('async operation', (err, response) => {
if (err) {
console.error('操作超时');
return;
}
console.log('响应:', response);
});
命名空间(Namespaces)
命名空间用于分离应用逻辑,所有连接默认连接到 / 命名空间。
// 服务端 - 定义命名空间
const chatNsp = io.of('/chat');
chatNsp.on('connection', (socket) => {
console.log('用户连接到聊天命名空间');
socket.on('message', (msg) => {
chatNsp.emit('message', msg); // 只广播给 /chat 命名空间的用户
});
});
const adminNsp = io.of('/admin');
adminNsp.on('connection', (socket) => {
console.log('管理员连接');
// 可以添加认证中间件
socket.on('stats', () => {
adminNsp.emit('stats', {
totalConnections: io.engine.clientsCount
});
});
});
// 客户端 - 连接到特定命名空间
const chatSocket = io('/chat');
const adminSocket = io('/admin');
chatSocket.on('message', (msg) => {
console.log('聊天消息:', msg);
});
adminSocket.on('stats', (stats) => {
console.log('统计数据:', stats);
});
房间(Rooms)
房间用于将连接分组,便于向特定组广播消息。每个 socket 默认加入以其 ID 命名的房间。
io.on('connection', (socket) => {
// 加入房间
socket.join('room-101');
// 同时加入多个房间
socket.join(['room-101', 'room-202']);
// 离开房间
socket.leave('room-101');
// 获取 socket 所在的所有房间
console.log(socket.rooms); // Set { socket.id, 'room-101' }
});
// 向特定房间广播
io.to('room-101').emit('notification', '房间 101 的消息');
// 向多个房间广播
io.to('room-101').to('room-202').emit('notification', '多个房间的消息');
// 向房间广播(排除发送者)
socket.to('room-101').emit('notification', '房间内其他人收到');
实战示例:聊天室
io.on('connection', (socket) => {
// 用户加入聊天室
socket.on('join room', (roomId) => {
socket.join(roomId);
// 通知房间内其他人
socket.to(roomId).emit('user joined', {
userId: socket.id,
message: '新用户加入了聊天室'
});
// 发送欢迎消息给当前用户
socket.emit('welcome', {
message: `欢迎加入房间 ${roomId}`
});
});
// 发送聊天消息
socket.on('chat message', (data) => {
const { roomId, message } = data;
// 向房间内所有人广播消息
io.to(roomId).emit('chat message', {
userId: socket.id,
message,
timestamp: Date.now()
});
});
// 用户离开聊天室
socket.on('leave room', (roomId) => {
socket.leave(roomId);
socket.to(roomId).emit('user left', {
userId: socket.id
});
});
// 断开连接时自动离开所有房间
socket.on('disconnect', () => {
// 通知所有房间用户离开了
socket.rooms.forEach((room) => {
socket.to(room).emit('user left', {
userId: socket.id
});
});
});
});
中间件
Socket.IO 支持中间件,可用于认证、日志记录等。
// 全局中间件(应用于所有命名空间)
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('认证失败:缺少 token'));
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
socket.user = decoded; // 将用户信息附加到 socket
next();
} catch (err) {
next(new Error('认证失败:token 无效'));
}
});
// 命名空间中间件
io.of('/admin').use((socket, next) => {
if (socket.user?.role !== 'admin') {
return next(new Error('权限不足'));
}
next();
});
// 客户端发送认证信息
const socket = io({
auth: {
token: 'your-jwt-token'
}
});
// 处理认证错误
socket.on('connect_error', (err) => {
if (err.message === '认证失败') {
console.error('请先登录');
}
});
ws 库:轻量级 WebSocket 实现
如果只需要纯 WebSocket 功能,ws 是一个更轻量、更快速的选择。
安装与基本使用
npm install ws
独立 WebSocket 服务器
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws, req) => {
// 获取客户端 IP
const ip = req.socket.remoteAddress;
console.log(`客户端连接: ${ip}`);
// 发送消息
ws.send('欢迎连接到 WebSocket 服务器');
// 接收消息
ws.on('message', (data, isBinary) => {
const message = isBinary ? data : data.toString();
console.log('收到消息:', message);
// 回显消息
ws.send(`服务端收到: ${message}`);
});
// 错误处理
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
// 关闭连接
ws.on('close', () => {
console.log('客户端断开连接');
});
});
console.log('WebSocket 服务器运行在 ws://localhost:8080');
与 Express 集成
ws 可以与 Express 共享同一个 HTTP 服务器:
const express = require('express');
const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const app = express();
const server = createServer(app);
// Express 路由
app.get('/', (req, res) => {
res.send('Hello Express');
});
// 创建 WebSocket 服务器,共享 HTTP 服务器
const wss = new WebSocketServer({ server });
wss.on('connection', (ws, req) => {
// 获取 URL 路径
const url = new URL(req.url, `http://${req.headers.host}`);
console.log('连接路径:', url.pathname);
ws.on('message', (data) => {
ws.send(`收到: ${data}`);
});
});
server.listen(3000, () => {
console.log('服务器运行在 http://localhost:3000');
console.log('WebSocket 运行在 ws://localhost:3000');
});
多路径 WebSocket
通过 noServer 模式实现不同路径的 WebSocket:
const express = require('express');
const { createServer } = require('http');
const { WebSocketServer } = require('ws');
const app = express();
const server = createServer(app);
// 创建两个 WebSocket 服务器
const chatWss = new WebSocketServer({ noServer: true });
const notifyWss = new WebSocketServer({ noServer: true });
chatWss.on('connection', (ws) => {
ws.on('message', (data) => {
// 广播给所有聊天客户端
chatWss.clients.forEach((client) => {
if (client.readyState === 1) { // WebSocket.OPEN
client.send(data.toString());
}
});
});
});
notifyWss.on('connection', (ws) => {
ws.on('message', (data) => {
console.log('通知:', data.toString());
});
});
// 根据路径分发 WebSocket 连接
server.on('upgrade', (req, socket, head) => {
const { pathname } = new URL(req.url, `http://${req.headers.host}`);
if (pathname === '/chat') {
chatWss.handleUpgrade(req, socket, head, (ws) => {
chatWss.emit('connection', ws, req);
});
} else if (pathname === '/notify') {
notifyWss.handleUpgrade(req, socket, head, (ws) => {
notifyWss.emit('connection', ws, req);
});
} else {
socket.destroy();
}
});
server.listen(3000);
心跳检测
WebSocket 连接可能在双方不知情的情况下断开,需要心跳机制检测:
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
// 心跳检测函数
function heartbeat() {
this.isAlive = true;
}
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', heartbeat); // 收到 pong 时标记存活
});
// 定时发送 ping,检测连接状态
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate(); // 终止不响应的连接
}
ws.isAlive = false;
ws.ping(); // 发送 ping
});
}, 30000); // 每 30 秒检测一次
// 服务器关闭时清理定时器
wss.on('close', () => {
clearInterval(interval);
});
// 客户端心跳
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:8080');
let pingTimeout;
function heartbeat() {
clearTimeout(pingTimeout);
// 如果 30 秒内没收到 ping,认为连接断开
pingTimeout = setTimeout(() => {
ws.terminate();
}, 30000 + 1000);
}
ws.on('open', heartbeat);
ws.on('ping', heartbeat);
ws.on('close', () => clearTimeout(pingTimeout));
广播消息
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });
// 广播给所有客户端
function broadcast(data) {
wss.clients.forEach((client) => {
if (client.readyState === 1) { // WebSocket.OPEN
client.send(data);
}
});
}
// 广播给除了发送者外的所有客户端
wss.on('connection', (ws) => {
ws.on('message', (data) => {
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === 1) {
client.send(data);
}
});
});
});
实战案例
实时通知系统
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: { origin: '*' }
});
// 存储用户连接
const userSockets = new Map();
// 认证中间件
io.use((socket, next) => {
const userId = socket.handshake.auth.userId;
if (userId) {
socket.userId = userId;
userSockets.set(userId, socket);
next();
} else {
next(new Error('未认证'));
}
});
io.on('connection', (socket) => {
console.log(`用户 ${socket.userId} 已连接`);
// 用户加入自己的通知频道
socket.join(`user:${socket.userId}`);
socket.on('disconnect', () => {
userSockets.delete(socket.userId);
console.log(`用户 ${socket.userId} 已断开`);
});
});
// REST API 发送通知
app.use(express.json());
app.post('/api/notify', (req, res) => {
const { userId, title, message } = req.body;
// 向特定用户发送通知
io.to(`user:${userId}`).emit('notification', {
title,
message,
timestamp: Date.now()
});
res.json({ success: true });
});
// 广播系统通知
app.post('/api/broadcast', (req, res) => {
const { title, message } = req.body;
io.emit('system notification', {
title,
message,
timestamp: Date.now()
});
res.json({ success: true });
});
httpServer.listen(3000);
实时协作编辑
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer);
// 文档内容存储
const documents = new Map();
io.on('connection', (socket) => {
// 加入文档编辑
socket.on('join document', (docId) => {
socket.join(`doc:${docId}`);
// 发送当前文档内容
if (documents.has(docId)) {
socket.emit('document sync', documents.get(docId));
}
});
// 接收文档变更
socket.on('document change', (data) => {
const { docId, content, cursor } = data;
// 更新文档
documents.set(docId, content);
// 广播给其他编辑者
socket.to(`doc:${docId}`).emit('document update', {
content,
cursor,
userId: socket.id
});
});
// 光标位置同步
socket.on('cursor move', (data) => {
const { docId, position } = data;
socket.to(`doc:${docId}`).emit('cursor update', {
userId: socket.id,
position
});
});
});
httpServer.listen(3000);
实时数据监控大屏
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer);
// 模拟数据推送
setInterval(() => {
// 实时数据
const data = {
timestamp: Date.now(),
cpu: Math.random() * 100,
memory: Math.random() * 100,
requests: Math.floor(Math.random() * 1000)
};
// 推送给所有监控客户端
io.emit('metrics', data);
}, 1000);
// 订阅特定服务
io.on('connection', (socket) => {
socket.on('subscribe', (serviceId) => {
socket.join(`service:${serviceId}`);
});
socket.on('unsubscribe', (serviceId) => {
socket.leave(`service:${serviceId}`);
});
});
// API 接收告警
app.post('/api/alert', express.json(), (req, res) => {
const { serviceId, level, message } = req.body;
io.to(`service:${serviceId}`).emit('alert', {
level,
message,
timestamp: Date.now()
});
res.json({ success: true });
});
httpServer.listen(3000);
生产环境最佳实践
1. 连接认证
// Socket.IO 认证中间件
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token;
const decoded = jwt.verify(token, process.env.JWT_SECRET);
// 验证用户状态
const user = await User.findById(decoded.id);
if (!user || !user.active) {
throw new Error('用户不存在或已禁用');
}
socket.user = user;
next();
} catch (err) {
next(new Error('认证失败'));
}
});
2. 限流保护
const rateLimit = require('express-rate-limit');
// 限制连接频率
const connectionLimiter = rateLimit({
windowMs: 60 * 1000,
max: 10,
message: '连接请求过于频繁'
});
// 应用于 Socket.IO
io.use((socket, next) => {
connectionLimiter(socket.request, socket.request.res, next);
});
// 限制消息频率
io.use((socket, next) => {
socket.use((packet, next) => {
// 实现消息限流逻辑
next();
});
next();
});
3. 优雅关闭
const gracefulShutdown = () => {
console.log('正在关闭 WebSocket 连接...');
// 通知所有客户端
io.emit('server shutdown', {
message: '服务器即将关闭,请保存数据'
});
// 断开所有连接
io.disconnectSockets(true);
// 关闭服务器
io.close(() => {
console.log('WebSocket 服务器已关闭');
process.exit(0);
});
};
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
4. 集群支持
使用 Redis 适配器支持多实例:
npm install @socket.io/redis-adapter redis
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
io.adapter(createAdapter(pubClient, subClient));
});
5. 监控指标
// 连接统计
setInterval(() => {
const stats = {
connected: io.engine.clientsCount,
namespaces: Object.keys(io.nsps).length,
rooms: io.sockets.adapter.rooms.size
};
console.log('WebSocket 统计:', stats);
}, 60000);
Socket.IO 与 ws 选择建议
| 场景 | 推荐 |
|---|---|
| 需要自动重连、房间、广播 | Socket.IO |
| 需要兼容老旧浏览器 | Socket.IO |
| 追求极致性能和最小体积 | ws |
| 只需基本 WebSocket 功能 | ws |
| 需要与原生 WebSocket 客户端通信 | ws |
小结
本章介绍了 Express 应用中的实时通信方案:
- WebSocket 基础:理解 HTTP 与 WebSocket 的区别,连接建立过程
- Socket.IO:功能丰富的实时通信库,支持自动重连、房间、命名空间、确认机制
- ws 库:轻量级高性能的纯 WebSocket 实现
- 实战应用:聊天室、实时通知、协作编辑、监控大屏
- 生产实践:认证、限流、优雅关闭、集群支持
实时通信是现代 Web 应用的重要能力,选择合适的方案并遵循最佳实践,可以构建稳定高效的实时应用。
练习
- 使用 Socket.IO 实现一个简单的聊天室,支持加入房间、发送消息、显示在线用户
- 使用 ws 库实现一个实时日志推送系统,后端推送日志,前端实时显示
- 实现 WebSocket 认证中间件,使用 JWT 验证用户身份
- 使用 Redis 适配器配置 Socket.IO 集群模式
- 实现一个实时协作的白板应用,支持多用户同时绘图