最佳实践
构建生产级别的 WebSocket 应用需要考虑连接管理、错误处理、安全性和性能优化等多个方面。本章总结 WebSocket 开发的最佳实践。
连接管理
连接池管理
对于高并发场景,需要有效管理连接池:
class ConnectionPool {
constructor(maxConnections = 10000) {
this.connections = new Map();
this.maxConnections = maxConnections;
}
add(id, connection) {
if (this.connections.size >= this.maxConnections) {
this.evictOldest();
}
this.connections.set(id, {
connection,
lastActivity: Date.now(),
metadata: {}
});
}
evictOldest() {
let oldest = null;
let oldestTime = Infinity;
for (const [id, data] of this.connections) {
if (data.lastActivity < oldestTime) {
oldestTime = data.lastActivity;
oldest = id;
}
}
if (oldest) {
const conn = this.connections.get(oldest);
conn.connection.close(1001, '连接超时');
this.connections.delete(oldest);
}
}
updateActivity(id) {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = Date.now();
}
}
}
连接状态追踪
class ConnectionState {
constructor() {
this.state = 'disconnected';
this.retryCount = 0;
this.lastConnected = null;
this.lastDisconnected = null;
}
connect() {
this.state = 'connecting';
}
connected() {
this.state = 'connected';
this.retryCount = 0;
this.lastConnected = Date.now();
}
disconnect() {
this.state = 'disconnected';
this.lastDisconnected = Date.now();
}
shouldReconnect() {
if (this.state === 'connected') return false;
if (this.retryCount >= 5) return false;
return true;
}
getRetryDelay() {
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(baseDelay * Math.pow(2, this.retryCount), maxDelay);
return delay + Math.random() * 1000;
}
}
心跳机制
心跳机制对于检测断开的连接至关重要。
客户端心跳
class HeartbeatManager {
constructor(socket, options = {}) {
this.socket = socket;
this.interval = options.interval || 30000;
this.timeout = options.timeout || 5000;
this.maxMissed = options.maxMissed || 3;
this.missedHeartbeats = 0;
this.timer = null;
this.timeoutTimer = null;
}
start() {
this.stop();
this.timer = setInterval(() => this.sendPing(), this.interval);
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
sendPing() {
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}
this.missedHeartbeats++;
if (this.missedHeartbeats > this.maxMissed) {
this.stop();
this.socket.close(1001, '心跳超时');
return;
}
this.socket.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
this.timeoutTimer = setTimeout(() => {
if (this.missedHeartbeats > 0) {
console.warn('心跳响应超时');
}
}, this.timeout);
}
onPong() {
this.missedHeartbeats = 0;
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
}
服务端心跳
function setupHeartbeat(wss, interval = 30000) {
const clients = new Map();
wss.on('connection', (ws) => {
clients.set(ws, { isAlive: true, lastPong: Date.now() });
ws.on('pong', () => {
const client = clients.get(ws);
if (client) {
client.isAlive = true;
client.lastPong = Date.now();
}
});
ws.on('close', () => {
clients.delete(ws);
});
});
const timer = setInterval(() => {
wss.clients.forEach((ws) => {
const client = clients.get(ws);
if (!client || !client.isAlive) {
ws.terminate();
clients.delete(ws);
return;
}
client.isAlive = false;
ws.ping();
});
}, interval);
wss.on('close', () => {
clearInterval(timer);
});
}
错误处理
客户端错误处理
class RobustWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
maxRetryDelay: 30000,
...options
};
this.socket = null;
this.retryCount = 0;
this.forcedClose = false;
this.messageQueue = [];
this.handlers = {
open: [],
message: [],
error: [],
close: []
};
this.connect();
}
connect() {
try {
this.socket = new WebSocket(this.url);
this.socket.onopen = (event) => {
this.retryCount = 0;
this.flushQueue();
this.emit('open', event);
};
this.socket.onmessage = (event) => {
this.emit('message', event);
};
this.socket.onerror = (event) => {
this.emit('error', event);
};
this.socket.onclose = (event) => {
this.emit('close', event);
if (!this.forcedClose && this.shouldReconnect()) {
this.scheduleReconnect();
}
};
} catch (error) {
this.emit('error', error);
if (this.shouldReconnect()) {
this.scheduleReconnect();
}
}
}
shouldReconnect() {
return this.retryCount < this.options.maxRetries;
}
scheduleReconnect() {
this.retryCount++;
const delay = Math.min(
this.options.retryDelay * Math.pow(2, this.retryCount - 1),
this.options.maxRetryDelay
);
setTimeout(() => this.connect(), delay);
}
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
} else {
this.messageQueue.push(data);
}
}
flushQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
this.socket.send(data);
}
}
close(code = 1000, reason = 'Normal closure') {
this.forcedClose = true;
if (this.socket) {
this.socket.close(code, reason);
}
}
on(event, handler) {
if (this.handlers[event]) {
this.handlers[event].push(handler);
}
}
emit(event, data) {
this.handlers[event]?.forEach(handler => handler(data));
}
}
服务端错误处理
function handleConnection(ws, req) {
let isAuthenticated = false;
try {
isAuthenticated = authenticateConnection(req);
} catch (error) {
ws.close(1008, 'Authentication failed');
return;
}
if (!isAuthenticated) {
ws.close(1008, 'Unauthorized');
return;
}
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(ws, message);
} catch (error) {
if (error instanceof SyntaxError) {
ws.send(JSON.stringify({ error: 'Invalid JSON' }));
} else {
console.error('处理消息错误:', error);
ws.send(JSON.stringify({ error: 'Internal error' }));
}
}
});
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
cleanupConnection(ws);
});
}
安全考虑
WebSocket 安全涉及多个层面,包括传输安全、认证授权、跨站攻击防护等。理解这些安全威胁和防护措施对于构建生产级应用至关重要。
传输安全
始终使用 WSS(WebSocket Secure):
在生产环境中,必须使用 wss:// 而非 ws://。WSS 在 WebSocket 之上添加了 TLS 加密层,提供以下保护:
| 威胁 | WS 漏洞 | WSS 保护 |
|---|---|---|
| 中间人窃听 | 数据明文传输 | TLS 加密 |
| 数据篡改 | 无完整性保护 | TLS MAC 校验 |
| 身份伪造 | 无法验证服务器 | 证书验证 |
| 会话劫持 | Cookie 明文传输 | 加密传输 |
// 正确做法:根据页面协议自动选择
const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${location.host}/ws`;
const socket = new WebSocket(wsUrl);
// 错误做法:HTTPS 页面使用 WS 会被浏览器阻止
// Mixed Content: The page was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://...'
认证机制
WebSocket 协议本身不定义认证机制,认证需要在应用层实现。以下是几种常见的认证方案及其优缺点:
方案对比:
| 方案 | 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| URL 参数 | ws://host?token=xxx | 简单直接 | Token 暴露在日志中 | 开发测试 |
| Cookie | 同域 Cookie 自动携带 | 安全,不暴露 | 仅限同域 | 同域应用 |
| 子协议 | new WebSocket(url, [token]) | 不暴露在 URL | 滥用子协议语义 | 特殊场景 |
| 首条消息认证 | 连接后发送认证 | 灵活 | 连接消耗资源 | 复杂认证流程 |
方案一:URL 参数认证
最简单但安全性较低的方案:
// 客户端
const token = localStorage.getItem('token');
const socket = new WebSocket(`wss://api.example.com/ws?token=${token}`);
// 服务端
const jwt = require('jsonwebtoken');
function authenticateConnection(req) {
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');
if (!token) {
return { authenticated: false, reason: 'Missing token' };
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
return { authenticated: true, user: decoded };
} catch (error) {
return { authenticated: false, reason: 'Invalid token' };
}
}
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const result = authenticateConnection(info.req);
callback(result.authenticated, result.authenticated ? 200 : 401, result.reason);
}
});
安全注意事项:
- Token 会出现在服务器访问日志、代理日志、浏览器历史记录中
- 仅适用于短期有效的一次性 Token
- 生产环境不推荐
方案二:Cookie 认证
最安全的方式,利用现有会话:
// 客户端:同域情况下 Cookie 自动发送
const socket = new WebSocket('wss://api.example.com/ws');
// 服务端:验证 Cookie 中的会话
const cookieParser = require('cookie-parser');
const session = require('express-session');
// Express 配置
app.use(cookieParser());
app.use(session({
secret: process.env.SESSION_SECRET,
cookie: { secure: true, sameSite: 'strict' }
}));
// WebSocket 认证
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
// 解析 Cookie
const cookies = cookieParser.signedCookies(
require('cookie').parse(info.req.headers.cookie || ''),
process.env.SESSION_SECRET
);
const sessionId = cookies['connect.sid'];
if (!sessionId) {
callback(false, 401, 'No session');
return;
}
// 从会话存储获取用户信息
sessionStore.get(sessionId, (err, session) => {
if (err || !session || !session.user) {
callback(false, 401, 'Invalid session');
return;
}
info.req.user = session.user;
callback(true);
});
}
});
方案三:首条消息认证
连接后通过首条消息完成认证,适合复杂认证流程:
// 服务端
const pendingAuth = new Map();
wss.on('connection', (ws, req) => {
// 连接初始为未认证状态
ws.isAuthenticated = false;
ws.authTimeout = setTimeout(() => {
if (!ws.isAuthenticated) {
ws.close(1008, 'Authentication timeout');
}
}, 5000); // 5秒内必须完成认证
ws.on('message', (data) => {
// 未认证时只处理认证消息
if (!ws.isAuthenticated) {
try {
const msg = JSON.parse(data);
if (msg.type === 'auth') {
const user = verifyToken(msg.token);
if (user) {
ws.isAuthenticated = true;
ws.user = user;
clearTimeout(ws.authTimeout);
ws.send(JSON.stringify({ type: 'auth_success' }));
} else {
ws.close(1008, 'Authentication failed');
}
}
} catch (e) {
ws.close(1002, 'Invalid message');
}
return;
}
// 已认证,正常处理消息
handleMessage(ws, data);
});
});
// 客户端
const socket = new WebSocket('wss://api.example.com/ws');
socket.onopen = () => {
// 立即发送认证消息
socket.send(JSON.stringify({
type: 'auth',
token: localStorage.getItem('token')
}));
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'auth_success') {
console.log('认证成功');
// 开始正常通信
}
};
最佳实践总结:
- 同域应用优先使用 Cookie:利用现有会话机制,最安全便捷
- 跨域应用使用 URL 参数 + 短期 Token:生成一次性连接 Token,使用后立即失效
- 复杂认证使用首条消息:支持 OAuth、多因素认证等复杂流程
- 始终设置认证超时:防止未认证连接占用资源
Origin 验证
跨站 WebSocket 劫持攻击(CSWSH):
CSWSH 是一种类似于 CSRF 的攻击方式。攻击者在其控制的网站上嵌入 JavaScript,尝试连接受害者的 WebSocket 服务器。由于浏览器会自动携带受害者的 Cookie,如果服务器不验证 Origin,攻击者就能以受害者身份建立连接。
攻击场景示例:
1. 用户登录了 bank.example.com,Cookie 中包含会话凭证
2. 用户访问了攻击者的网站 evil.com
3. evil.com 的 JavaScript 尝试连接 wss://bank.example.com/ws
4. 浏览器自动携带 bank.example.com 的 Cookie
5. 如果服务器不验证 Origin,连接建立成功
6. 攻击者现在可以通过这个连接执行转账等操作
防护实现:
const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const origin = info.origin;
// 注意:Origin 头部可能为空(非浏览器客户端)
// 对于浏览器客户端,origin 必须存在且在白名单中
if (!origin) {
// 非浏览器客户端,使用其他认证方式
// 例如检查 URL 参数中的 token
const token = new URL(info.req.url, 'http://localhost').searchParams.get('token');
if (validateServiceToken(token)) {
callback(true);
return;
}
callback(false, 401, 'Unauthorized');
return;
}
if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}
callback(true);
}
});
Origin 验证要点:
- 严格匹配:使用完全匹配而非正则或部分匹配,避免
evil-example.com绕过 - 区分环境:开发环境可以放宽限制,生产环境必须严格
- 考虑子域名:明确是否允许子域名访问
// 更严格的 Origin 验证
function isValidOrigin(origin) {
try {
const url = new URL(origin);
const allowedDomains = ['example.com', 'app.example.com'];
// 完全匹配域名
if (allowedDomains.includes(url.hostname)) {
return true;
}
// 如果需要允许所有子域名(谨慎使用)
// if (url.hostname.endsWith('.example.com')) {
// return true;
// }
return false;
} catch {
return false;
}
}
速率限制
class RateLimiter {
constructor(maxMessages = 100, windowMs = 60000) {
this.limits = new Map();
this.maxMessages = maxMessages;
this.windowMs = windowMs;
}
check(clientId) {
const now = Date.now();
let limit = this.limits.get(clientId);
if (!limit || now - limit.windowStart > this.windowMs) {
limit = { count: 0, windowStart: now };
this.limits.set(clientId, limit);
}
limit.count++;
return limit.count <= this.maxMessages;
}
remaining(clientId) {
const limit = this.limits.get(clientId);
if (!limit) return this.maxMessages;
return Math.max(0, this.maxMessages - limit.count);
}
}
const limiter = new RateLimiter(100, 60000);
ws.on('message', (data) => {
if (!limiter.check(ws.id)) {
ws.send(JSON.stringify({ error: 'Rate limit exceeded' }));
ws.close(1008, 'Rate limit exceeded');
return;
}
handleMessage(data);
});
消息验证
const Joi = require('joi');
const messageSchema = Joi.object({
type: Joi.string().valid('chat', 'join', 'leave').required(),
content: Joi.string().max(1000).when('type', {
is: 'chat',
then: Joi.required()
}),
room: Joi.string().max(50).when('type', {
is: 'join',
then: Joi.required()
})
});
function validateMessage(data) {
const { error, value } = messageSchema.validate(data);
if (error) {
throw new Error(`Validation error: ${error.message}`);
}
return value;
}
ws.on('message', (rawData) => {
try {
const data = JSON.parse(rawData);
const validatedData = validateMessage(data);
handleMessage(ws, validatedData);
} catch (error) {
ws.send(JSON.stringify({ error: error.message }));
}
});
性能优化
消息压缩
WebSocket 协议支持 permessage-deflate 扩展,使用 DEFLATE 算法压缩消息。压缩可以显著减少传输数据量,但需要权衡 CPU 开销和网络带宽收益。
压缩效果分析:
| 数据类型 | 原始大小 | 压缩后大小 | 压缩率 |
|---|---|---|---|
| JSON 文本(重复键) | 10KB | 1-2KB | 80-90% |
| JSON 文本(唯一键) | 10KB | 3-5KB | 50-70% |
| 纯文本内容 | 10KB | 2-4KB | 60-80% |
| 已压缩数据(图片、视频) | 10KB | 10-11KB | 无收益或负收益 |
| 二进制协议数据 | 10KB | 6-8KB | 20-40% |
Node.js ws 库配置:
const WebSocket = require('ws');
const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3 // 压缩级别 1-9,1最快压缩率最低,9最慢压缩率最高
},
zlibInflateOptions: {
chunkSize: 10 * 1024 // 解压缓冲区大小
},
clientNoContextTakeover: true, // 客户端不保留压缩上下文
serverNoContextTakeover: true, // 服务端不保留压缩上下文
clientMaxWindowBits: 10, // 客户端滑动窗口大小
serverMaxWindowBits: 10 // 服务端滑动窗口大小
}
});
配置参数详解:
-
level:压缩级别,1-9。推荐值 3-6,平衡压缩率和 CPU 开销。实时应用建议使用 1-3,批量传输可用 6-9。
-
contextTakeover:是否保留跨消息的压缩上下文。
true(默认):保留上下文,利用历史数据提高压缩率,但内存占用更高false:每条消息独立压缩,内存占用低,但压缩率稍低
-
windowBits:滑动窗口大小,影响压缩率和内存。默认 15,减少到 10-12 可降低内存但略微降低压缩率。
何时启用压缩:
适合压缩的场景:
- 大量重复结构的 JSON 消息(如聊天、状态更新)
- 文本内容为主的消息流
- 带宽受限的环境(移动网络、IoT 设备)
- 服务端向客户端推送大量数据
不适合压缩的场景:
- 已压缩数据(图片、视频、压缩文件)
- 极短消息(压缩后可能更大)
- 服务端 CPU 已经是瓶颈
- 低延迟要求的场景(压缩会增加处理延迟)
性能建议:
// 条件性启用压缩
function shouldCompress(message) {
const size = Buffer.byteLength(message);
// 小于 100 字节不压缩,压缩收益低
if (size < 100) return false;
// 检测是否为已压缩数据
if (isCompressedData(message)) return false;
return true;
}
// 动态压缩配置
const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: false // 默认不启用
});
// 对特定连接启用压缩
wss.on('connection', (ws, req) => {
const needCompress = req.headers['x-enable-compress'] === 'true';
if (needCompress) {
// 为此连接启用压缩扩展
}
});
批量处理
class MessageBatcher {
constructor(sendCallback, options = {}) {
this.sendCallback = sendCallback;
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 100;
this.batch = [];
this.timer = null;
}
add(message) {
this.batch.push(message);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const messages = this.batch;
this.batch = [];
this.sendCallback(messages);
}
}
const batcher = new MessageBatcher((messages) => {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ batch: messages }));
}
});
});
连接分组
class RoomManager {
constructor() {
this.rooms = new Map();
this.clientRooms = new Map();
}
join(roomId, clientId, connection) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Map());
}
this.rooms.get(roomId).set(clientId, connection);
this.clientRooms.set(clientId, roomId);
}
leave(clientId) {
const roomId = this.clientRooms.get(clientId);
if (roomId) {
this.rooms.get(roomId)?.delete(clientId);
this.clientRooms.delete(clientId);
}
}
broadcast(roomId, message, excludeClient = null) {
const room = this.rooms.get(roomId);
if (!room) return;
const data = JSON.stringify(message);
for (const [clientId, connection] of room) {
if (clientId !== excludeClient && connection.readyState === WebSocket.OPEN) {
connection.send(data);
}
}
}
}
监控与日志
生产环境的 WebSocket 服务需要完善的监控体系,以便及时发现和定位问题。监控应该覆盖连接状态、消息吞吐量、延迟等关键指标。
关键性能指标
核心监控指标:
| 指标类型 | 指标名称 | 说明 | 告警阈值建议 |
|---|---|---|---|
| 连接 | 当前连接数 | 活跃的 WebSocket 连接数 | 超过容量 80% |
| 连接 | 连接建立速率 | 每秒新增连接数 | 根据业务峰值设置 |
| 连接 | 连接断开率 | 非正常关闭比例 | 超过 5% |
| 消息 | 消息吞吐量 | 每秒收发消息数 | 根据业务设置 |
| 消息 | 消息延迟 | 从发送到接收的时间 | 超过 100ms |
| 消息 | 消息大小 | 平均消息大小 | 异常增大时告警 |
| 资源 | 内存使用 | 进程内存占用 | 超过 80% |
| 资源 | CPU 使用 | 进程 CPU 占用 | 持续超过 70% |
| 错误 | 错误率 | 错误事件比例 | 超过 1% |
连接监控
class WebSocketMonitor {
constructor() {
this.stats = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
errors: 0
};
this.startTime = Date.now();
}
onConnect() {
this.stats.totalConnections++;
this.stats.activeConnections++;
}
onDisconnect() {
this.stats.activeConnections--;
}
onMessageReceived() {
this.stats.messagesReceived++;
}
onMessageSent() {
this.stats.messagesSent++;
}
onError() {
this.stats.errors++;
}
getStats() {
return {
...this.stats,
uptime: Date.now() - this.startTime,
uptimeSeconds: Math.floor((Date.now() - this.startTime) / 1000)
};
}
}
const monitor = new WebSocketMonitor();
wss.on('connection', (ws) => {
monitor.onConnect();
ws.on('message', (data) => {
monitor.onMessageReceived();
});
ws.on('close', () => {
monitor.onDisconnect();
});
ws.on('error', () => {
monitor.onError();
});
});
setInterval(() => {
console.log('WebSocket 统计:', monitor.getStats());
}, 60000);
告警机制
基于监控数据实现告警:
class WebSocketAlerter {
constructor(monitor, options = {}) {
this.monitor = monitor;
this.thresholds = {
connectionCount: options.connectionCount || 8000, // 80% of 10000
errorRate: options.errorRate || 0.05, // 5%
messageLatency: options.messageLatency || 100, // 100ms
...options.thresholds
};
this.alertCooldown = new Map(); // 防止告警风暴
this.cooldownPeriod = options.cooldownPeriod || 300000; // 5分钟
}
check() {
const stats = this.monitor.getStats();
// 检查连接数
if (stats.activeConnections > this.thresholds.connectionCount) {
this.alert('high_connections',
`连接数过高: ${stats.activeConnections} / ${this.thresholds.connectionCount}`);
}
// 检查错误率
const errorRate = stats.errors / (stats.messagesReceived || 1);
if (errorRate > this.thresholds.errorRate) {
this.alert('high_error_rate',
`错误率过高: ${(errorRate * 100).toFixed(2)}%`);
}
// 检查平均延迟(如果有记录)
if (stats.avgLatency > this.thresholds.messageLatency) {
this.alert('high_latency',
`消息延迟过高: ${stats.avgLatency}ms`);
}
}
alert(type, message) {
const now = Date.now();
const lastAlert = this.alertCooldown.get(type);
// 检查冷却期
if (lastAlert && now - lastAlert < this.cooldownPeriod) {
return;
}
this.alertCooldown.set(type, now);
// 发送告警
console.error(`[ALERT][${type}] ${message}`);
// 可以集成到告警系统
// this.sendToSlack(type, message);
// this.sendToPagerDuty(type, message);
}
}
// 使用
const alerter = new WebSocketAlerter(monitor, {
connectionCount: 8000,
errorRate: 0.05,
messageLatency: 100
});
setInterval(() => alerter.check(), 60000);
性能数据采集
class PerformanceMonitor {
constructor() {
this.latencies = [];
this.maxSamples = 1000;
}
// 记录消息延迟
recordLatency(sendTime, receiveTime) {
const latency = receiveTime - sendTime;
this.latencies.push(latency);
if (this.latencies.length > this.maxSamples) {
this.latencies.shift();
}
}
// 计算延迟统计
getLatencyStats() {
if (this.latencies.length === 0) {
return { min: 0, max: 0, avg: 0, p50: 0, p95: 0, p99: 0 };
}
const sorted = [...this.latencies].sort((a, b) => a - b);
const sum = sorted.reduce((a, b) => a + b, 0);
return {
min: sorted[0],
max: sorted[sorted.length - 1],
avg: sum / sorted.length,
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)]
};
}
}
// 集成到消息处理中
const perfMonitor = new PerformanceMonitor();
ws.on('message', (data) => {
const msg = JSON.parse(data);
// 如果消息包含时间戳,计算延迟
if (msg.timestamp) {
perfMonitor.recordLatency(msg.timestamp, Date.now());
}
// 处理消息...
});
结构化日志
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'websocket.log' })
]
});
function logConnection(ws, action, data = {}) {
logger.info({
type: 'websocket',
action: action,
clientId: ws.id,
ip: ws._socket?.remoteAddress,
...data
});
}
wss.on('connection', (ws, req) => {
ws.id = generateId();
logConnection(ws, 'connect');
ws.on('message', (data) => {
logConnection(ws, 'message', { size: data.length });
});
ws.on('close', (code, reason) => {
logConnection(ws, 'disconnect', { code, reason: reason.toString() });
});
});
小结
本章总结了 WebSocket 开发的最佳实践:
- 连接管理:连接池、状态追踪、优雅关闭
- 心跳机制:客户端和服务端心跳实现
- 错误处理:重连机制、错误恢复
- 安全考虑:认证、Origin 验证、速率限制、消息验证
- 性能优化:压缩、批量处理、连接分组
- 监控日志:统计信息、结构化日志
遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用。
消息格式设计
良好的消息格式设计对于 WebSocket 应用的可维护性和扩展性至关重要。
消息结构规范
推荐的消息结构:
// 基本消息结构
{
"type": "message_type", // 必需:消息类型
"id": "unique_message_id", // 推荐:消息唯一标识
"timestamp": 1234567890, // 推荐:消息时间戳
"data": { // 消息负载
// 具体数据
}
}
完整示例:
// 聊天消息
{
"type": "chat",
"id": "msg_abc123",
"timestamp": 1704067200000,
"data": {
"from": "user_001",
"to": "room_general",
"content": "大家好!"
}
}
// 系统通知
{
"type": "notification",
"id": "notif_xyz789",
"timestamp": 1704067201000,
"data": {
"level": "info",
"title": "新用户加入",
"content": "张三 加入了聊天室"
}
}
// 错误响应
{
"type": "error",
"id": "err_001",
"timestamp": 1704067202000,
"data": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "发送频率超限,请稍后重试",
"retryAfter": 60
}
}
类型定义规范
使用常量定义消息类型,避免硬编码字符串:
// message-types.js
export const MessageTypes = {
// 连接相关
CONNECT: 'connect',
DISCONNECT: 'disconnect',
HEARTBEAT: 'heartbeat',
// 聊天相关
CHAT: 'chat',
CHAT_TYPING: 'chat_typing',
CHAT_READ: 'chat_read',
// 系统相关
NOTIFICATION: 'notification',
ERROR: 'error',
// 业务相关
SUBSCRIBE: 'subscribe',
UNSUBSCRIBE: 'unsubscribe'
};
// 使用
socket.send(JSON.stringify({
type: MessageTypes.CHAT,
data: { content: '你好' }
}));
TypeScript 类型定义
对于 TypeScript 项目,定义完整的消息类型:
// types/websocket.ts
interface BaseMessage<T extends string, D> {
type: T;
id: string;
timestamp: number;
data: D;
}
interface ChatData {
from: string;
to: string;
content: string;
}
interface NotificationData {
level: 'info' | 'warning' | 'error';
title: string;
content: string;
}
interface ErrorData {
code: string;
message: string;
retryAfter?: number;
}
type ChatMessage = BaseMessage<'chat', ChatData>;
type NotificationMessage = BaseMessage<'notification', NotificationData>;
type ErrorMessage = BaseMessage<'error', ErrorData>;
type WebSocketMessage = ChatMessage | NotificationMessage | ErrorMessage;
// 类型守卫
function isChatMessage(msg: WebSocketMessage): msg is ChatMessage {
return msg.type === 'chat';
}
function isErrorMessage(msg: WebSocketMessage): msg is ErrorMessage {
return msg.type === 'error';
}
// 使用
socket.onmessage = (event) => {
const msg: WebSocketMessage = JSON.parse(event.data);
if (isChatMessage(msg)) {
handleChat(msg.data);
} else if (isErrorMessage(msg)) {
showError(msg.data.message);
}
};
消息确认机制
对于重要消息,实现确认机制确保消息送达:
class ReliableWebSocket {
constructor(url) {
this.url = url;
this.socket = null;
this.pendingMessages = new Map(); // 等待确认的消息
this.messageId = 0;
this.connect();
}
connect() {
this.socket = new WebSocket(this.url);
this.socket.onmessage = (event) => {
const msg = JSON.parse(event.data);
// 处理确认消息
if (msg.type === 'ack') {
this.pendingMessages.delete(msg.data.originalId);
return;
}
// 发送确认
if (msg.id) {
this.socket.send(JSON.stringify({
type: 'ack',
data: { originalId: msg.id }
}));
}
// 处理业务消息
this.handleMessage(msg);
};
}
// 发送需要确认的消息
sendReliable(data, timeout = 5000) {
const id = `msg_${++this.messageId}`;
const message = { ...data, id };
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingMessages.delete(id);
reject(new Error('消息确认超时'));
}, timeout);
this.pendingMessages.set(id, { message, resolve, reject, timer });
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
});
}
// 重发未确认的消息
resendPending() {
this.pendingMessages.forEach(({ message }, id) => {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
});
}
}
客户端重连策略
健壮的重连策略是生产级 WebSocket 应用的基础。
指数退避算法
class ExponentialBackoff {
constructor(options = {}) {
this.baseDelay = options.baseDelay || 1000; // 初始延迟 1 秒
this.maxDelay = options.maxDelay || 30000; // 最大延迟 30 秒
this.jitter = options.jitter || true; // 是否添加随机抖动
this.multiplier = options.multiplier || 2; // 乘数
this.attempt = 0;
}
nextDelay() {
const delay = Math.min(
this.baseDelay * Math.pow(this.multiplier, this.attempt),
this.maxDelay
);
this.attempt++;
// 添加随机抖动,避免多个客户端同时重连
if (this.jitter) {
return delay + Math.random() * 1000;
}
return delay;
}
reset() {
this.attempt = 0;
}
}
// 使用
const backoff = new ExponentialBackoff();
socket.onclose = () => {
const delay = backoff.nextDelay();
console.log(`${delay / 1000} 秒后重连`);
setTimeout(reconnect, delay);
};
socket.onopen = () => {
backoff.reset(); // 连接成功后重置
};
完整的重连客户端
class RobustWebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: Infinity, // 最大重连次数
reconnectOnClose: true, // 关闭时是否重连
reconnectOnError: true, // 错误时是否重连
...options
};
this.socket = null;
this.reconnectAttempts = 0;
this.backoff = new ExponentialBackoff();
this.messageQueue = []; // 离线消息队列
this.eventHandlers = new Map(); // 事件处理器
this.connect();
}
connect() {
try {
this.socket = new WebSocket(this.url);
this.setupEventHandlers();
} catch (error) {
console.error('创建连接失败:', error);
this.scheduleReconnect();
}
}
setupEventHandlers() {
this.socket.onopen = (event) => {
console.log('连接成功');
this.reconnectAttempts = 0;
this.backoff.reset();
this.flushMessageQueue();
this.emit('open', event);
};
this.socket.onmessage = (event) => {
this.emit('message', event);
};
this.socket.onerror = (event) => {
console.error('连接错误');
this.emit('error', event);
};
this.socket.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason);
this.emit('close', event);
// 根据关闭码决定是否重连
if (this.shouldReconnect(event.code)) {
this.scheduleReconnect();
}
};
}
shouldReconnect(code) {
// 1000: 正常关闭,不需要重连
// 1001: 端点离开,不需要重连
// 1008: 策略违规,可能需要重新认证
// 其他情况通常需要重连
if (code === 1000 || code === 1001) {
return false;
}
return this.reconnectAttempts < this.options.maxReconnectAttempts;
}
scheduleReconnect() {
const delay = this.backoff.nextDelay();
this.reconnectAttempts++;
console.log(`第 ${this.reconnectAttempts} 次重连,${Math.round(delay / 1000)} 秒后执行`);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
}
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(typeof data === 'string' ? data : JSON.stringify(data));
return true;
}
// 连接未打开,加入队列
this.messageQueue.push(data);
return false;
}
flushMessageQueue() {
while (this.messageQueue.length > 0 &&
this.socket.readyState === WebSocket.OPEN) {
const data = this.messageQueue.shift();
this.send(data);
}
}
on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}
off(event, handler) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
emit(event, data) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => handler(data));
}
}
close(code = 1000, reason = 'Normal closure') {
// 手动关闭时不再重连
this.options.reconnectOnClose = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.socket) {
this.socket.close(code, reason);
}
}
}
// 使用示例
const client = new RobustWebSocketClient('wss://example.com/ws', {
maxReconnectAttempts: 10
});
client.on('message', (event) => {
console.log('收到消息:', event.data);
});
client.on('open', () => {
console.log('连接已建立');
});
client.send({ type: 'hello' }); // 即使未连接也会排队等待
大规模部署考量
当 WebSocket 应用需要支持大量并发连接时,架构设计需要特别注意。
连接数估算
在规划服务器容量时,需要对并发连接数进行合理估算:
单台服务器理论最大连接数 = 文件描述符限制 - 系统保留
# Linux 默认文件描述符限制通常为 1024
# 需要调整系统限制:
ulimit -n 65535
# 每个连接占用内存估算:
# 读缓冲区 (默认 64KB) + 写缓冲区 (默认 64KB) + 用户数据
# 约 128KB-256KB 每连接
# 8GB 内存服务器理论连接上限:
# 8GB / 256KB ≈ 32000 连接
# 实际建议预留 50% 余量,约 15000-20000 连接
水平扩展策略
WebSocket 是有状态的,这与无状态的 HTTP 请求不同,需要特殊的扩展策略:
方案一:粘性会话(Sticky Sessions)
负载均衡器根据客户端 IP 或 Cookie 将请求路由到同一后端服务器。优点是简单易实现,缺点是可能导致负载不均衡。
# Nginx 粘性会话配置
upstream websocket {
ip_hash;
server 10.0.0.1:8080;
server 10.0.0.2:8080;
server 10.0.0.3:8080;
}
方案二:消息广播(Pub/Sub)
使用 Redis、Kafka 或专门的消息代理在服务器间同步消息。每台服务器订阅消息频道,收到消息后广播给本地连接。
// Node.js 使用 Redis 发布订阅
const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();
subscriber.subscribe('chat:messages');
subscriber.on('message', (channel, message) => {
// 广播给本服务器的所有连接
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
});
// 收到客户端消息时发布到 Redis
ws.on('message', (data) => {
publisher.publish('chat:messages', data);
});
方案三:专用消息服务
对于超大规模应用,可以使用专门的消息服务如:
- Redis Streams:支持消费者组和消息持久化
- Apache Kafka:高吞吐量、持久化消息队列
- NATS:轻量级消息系统
- RabbitMQ:功能丰富的消息代理
连接迁移与优雅关闭
在进行服务更新或扩缩容时,需要实现连接的平滑迁移:
// 服务端优雅关闭示例
async function gracefulShutdown() {
console.log('开始优雅关闭...');
// 1. 停止接受新连接
wss.close();
// 2. 通知所有客户端即将关闭
const shutdownMessage = JSON.stringify({
type: 'shutdown',
reason: 'server_maintenance',
retryAfter: 30 // 秒
});
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(shutdownMessage);
}
});
// 3. 等待客户端断开或超时
await new Promise(resolve => {
const checkInterval = setInterval(() => {
if (wss.clients.size === 0) {
clearInterval(checkInterval);
resolve();
}
}, 1000);
// 最多等待 30 秒
setTimeout(() => {
clearInterval(checkInterval);
resolve();
}, 30000);
});
// 4. 强制关闭剩余连接
wss.clients.forEach(client => {
client.terminate();
});
console.log('所有连接已关闭');
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
跨数据中心部署
对于全球用户,可能需要跨数据中心部署:
跨数据中心部署需要考虑:
- 延迟:跨区域消息同步会增加延迟
- 一致性:需要选择合适的一致性级别
- 故障转移:单个数据中心故障时的处理策略
小结
本章全面总结了构建生产级 WebSocket 应用的最佳实践:
连接管理:
- 使用连接池有效管理大量并发连接
- 实现连接状态追踪,记录连接的生命周期
- 设计合理的连接超时和淘汰策略
心跳机制:
- 客户端实现应用层心跳检测连接状态
- 服务端使用 Ping/Pong 帧检测断开的连接
- 结合指数退避算法处理重连
错误处理:
- 客户端实现健壮的重连策略和消息队列
- 服务端正确处理各类异常情况
- 通过关闭码快速定位问题原因
安全考虑:
- 生产环境必须使用 WSS 加密传输
- 实现 Origin 验证防止 CSWSH 攻击
- 选择合适的认证方案(Cookie、URL 参数、首条消息认证)
- 实现速率限制和消息验证
性能优化:
- 根据数据类型决定是否启用压缩
- 批量处理消息减少系统调用
- 合理分组连接提高广播效率
监控与日志:
- 监控关键指标(连接数、吞吐量、延迟、错误率)
- 实现告警机制及时发现问题
- 使用结构化日志便于排查问题
大规模部署:
- 使用 Redis 等消息代理实现跨服务器通信
- 实现优雅关闭确保消息不丢失
- 跨数据中心部署需要权衡延迟和一致性
遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用,满足生产环境的严苛要求。