跳到主要内容

最佳实践

构建生产级别的 WebSocket 应用需要考虑连接管理、错误处理、安全性和性能优化等多个方面。本章总结 WebSocket 开发的最佳实践。

连接管理

连接池管理

对于高并发场景,需要有效管理连接池:

class ConnectionPool {
constructor(maxConnections = 10000) {
this.connections = new Map();
this.maxConnections = maxConnections;
}

add(id, connection) {
if (this.connections.size >= this.maxConnections) {
this.evictOldest();
}
this.connections.set(id, {
connection,
lastActivity: Date.now(),
metadata: {}
});
}

evictOldest() {
let oldest = null;
let oldestTime = Infinity;

for (const [id, data] of this.connections) {
if (data.lastActivity < oldestTime) {
oldestTime = data.lastActivity;
oldest = id;
}
}

if (oldest) {
const conn = this.connections.get(oldest);
conn.connection.close(1001, '连接超时');
this.connections.delete(oldest);
}
}

updateActivity(id) {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = Date.now();
}
}
}

连接状态追踪

class ConnectionState {
constructor() {
this.state = 'disconnected';
this.retryCount = 0;
this.lastConnected = null;
this.lastDisconnected = null;
}

connect() {
this.state = 'connecting';
}

connected() {
this.state = 'connected';
this.retryCount = 0;
this.lastConnected = Date.now();
}

disconnect() {
this.state = 'disconnected';
this.lastDisconnected = Date.now();
}

shouldReconnect() {
if (this.state === 'connected') return false;
if (this.retryCount >= 5) return false;
return true;
}

getRetryDelay() {
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(baseDelay * Math.pow(2, this.retryCount), maxDelay);
return delay + Math.random() * 1000;
}
}

心跳机制

心跳机制对于检测断开的连接至关重要。

客户端心跳

class HeartbeatManager {
constructor(socket, options = {}) {
this.socket = socket;
this.interval = options.interval || 30000;
this.timeout = options.timeout || 5000;
this.maxMissed = options.maxMissed || 3;

this.missedHeartbeats = 0;
this.timer = null;
this.timeoutTimer = null;
}

start() {
this.stop();
this.timer = setInterval(() => this.sendPing(), this.interval);
}

stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}

sendPing() {
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}

this.missedHeartbeats++;

if (this.missedHeartbeats > this.maxMissed) {
this.stop();
this.socket.close(1001, '心跳超时');
return;
}

this.socket.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));

this.timeoutTimer = setTimeout(() => {
if (this.missedHeartbeats > 0) {
console.warn('心跳响应超时');
}
}, this.timeout);
}

onPong() {
this.missedHeartbeats = 0;
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
}

服务端心跳

function setupHeartbeat(wss, interval = 30000) {
const clients = new Map();

wss.on('connection', (ws) => {
clients.set(ws, { isAlive: true, lastPong: Date.now() });

ws.on('pong', () => {
const client = clients.get(ws);
if (client) {
client.isAlive = true;
client.lastPong = Date.now();
}
});

ws.on('close', () => {
clients.delete(ws);
});
});

const timer = setInterval(() => {
wss.clients.forEach((ws) => {
const client = clients.get(ws);

if (!client || !client.isAlive) {
ws.terminate();
clients.delete(ws);
return;
}

client.isAlive = false;
ws.ping();
});
}, interval);

wss.on('close', () => {
clearInterval(timer);
});
}

错误处理

客户端错误处理

class RobustWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
maxRetryDelay: 30000,
...options
};

this.socket = null;
this.retryCount = 0;
this.forcedClose = false;
this.messageQueue = [];

this.handlers = {
open: [],
message: [],
error: [],
close: []
};

this.connect();
}

connect() {
try {
this.socket = new WebSocket(this.url);

this.socket.onopen = (event) => {
this.retryCount = 0;
this.flushQueue();
this.emit('open', event);
};

this.socket.onmessage = (event) => {
this.emit('message', event);
};

this.socket.onerror = (event) => {
this.emit('error', event);
};

this.socket.onclose = (event) => {
this.emit('close', event);

if (!this.forcedClose && this.shouldReconnect()) {
this.scheduleReconnect();
}
};
} catch (error) {
this.emit('error', error);
if (this.shouldReconnect()) {
this.scheduleReconnect();
}
}
}

shouldReconnect() {
return this.retryCount < this.options.maxRetries;
}

scheduleReconnect() {
this.retryCount++;
const delay = Math.min(
this.options.retryDelay * Math.pow(2, this.retryCount - 1),
this.options.maxRetryDelay
);

setTimeout(() => this.connect(), delay);
}

send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
} else {
this.messageQueue.push(data);
}
}

flushQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
this.socket.send(data);
}
}

close(code = 1000, reason = 'Normal closure') {
this.forcedClose = true;
if (this.socket) {
this.socket.close(code, reason);
}
}

on(event, handler) {
if (this.handlers[event]) {
this.handlers[event].push(handler);
}
}

emit(event, data) {
this.handlers[event]?.forEach(handler => handler(data));
}
}

服务端错误处理

function handleConnection(ws, req) {
let isAuthenticated = false;

try {
isAuthenticated = authenticateConnection(req);
} catch (error) {
ws.close(1008, 'Authentication failed');
return;
}

if (!isAuthenticated) {
ws.close(1008, 'Unauthorized');
return;
}

ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(ws, message);
} catch (error) {
if (error instanceof SyntaxError) {
ws.send(JSON.stringify({ error: 'Invalid JSON' }));
} else {
console.error('处理消息错误:', error);
ws.send(JSON.stringify({ error: 'Internal error' }));
}
}
});

ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});

ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
cleanupConnection(ws);
});
}

安全考虑

WebSocket 安全涉及多个层面,包括传输安全、认证授权、跨站攻击防护等。理解这些安全威胁和防护措施对于构建生产级应用至关重要。

传输安全

始终使用 WSS(WebSocket Secure)

在生产环境中,必须使用 wss:// 而非 ws://。WSS 在 WebSocket 之上添加了 TLS 加密层,提供以下保护:

威胁WS 漏洞WSS 保护
中间人窃听数据明文传输TLS 加密
数据篡改无完整性保护TLS MAC 校验
身份伪造无法验证服务器证书验证
会话劫持Cookie 明文传输加密传输
// 正确做法:根据页面协议自动选择
const protocol = location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${location.host}/ws`;
const socket = new WebSocket(wsUrl);

// 错误做法:HTTPS 页面使用 WS 会被浏览器阻止
// Mixed Content: The page was loaded over HTTPS, but attempted to connect to the insecure WebSocket endpoint 'ws://...'

认证机制

WebSocket 协议本身不定义认证机制,认证需要在应用层实现。以下是几种常见的认证方案及其优缺点:

方案对比

方案实现方式优点缺点适用场景
URL 参数ws://host?token=xxx简单直接Token 暴露在日志中开发测试
Cookie同域 Cookie 自动携带安全,不暴露仅限同域同域应用
子协议new WebSocket(url, [token])不暴露在 URL滥用子协议语义特殊场景
首条消息认证连接后发送认证灵活连接消耗资源复杂认证流程

方案一:URL 参数认证

最简单但安全性较低的方案:

// 客户端
const token = localStorage.getItem('token');
const socket = new WebSocket(`wss://api.example.com/ws?token=${token}`);

// 服务端
const jwt = require('jsonwebtoken');

function authenticateConnection(req) {
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');

if (!token) {
return { authenticated: false, reason: 'Missing token' };
}

try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
return { authenticated: true, user: decoded };
} catch (error) {
return { authenticated: false, reason: 'Invalid token' };
}
}

const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const result = authenticateConnection(info.req);
callback(result.authenticated, result.authenticated ? 200 : 401, result.reason);
}
});

安全注意事项

  • Token 会出现在服务器访问日志、代理日志、浏览器历史记录中
  • 仅适用于短期有效的一次性 Token
  • 生产环境不推荐

方案二:Cookie 认证

最安全的方式,利用现有会话:

// 客户端:同域情况下 Cookie 自动发送
const socket = new WebSocket('wss://api.example.com/ws');

// 服务端:验证 Cookie 中的会话
const cookieParser = require('cookie-parser');
const session = require('express-session');

// Express 配置
app.use(cookieParser());
app.use(session({
secret: process.env.SESSION_SECRET,
cookie: { secure: true, sameSite: 'strict' }
}));

// WebSocket 认证
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
// 解析 Cookie
const cookies = cookieParser.signedCookies(
require('cookie').parse(info.req.headers.cookie || ''),
process.env.SESSION_SECRET
);

const sessionId = cookies['connect.sid'];
if (!sessionId) {
callback(false, 401, 'No session');
return;
}

// 从会话存储获取用户信息
sessionStore.get(sessionId, (err, session) => {
if (err || !session || !session.user) {
callback(false, 401, 'Invalid session');
return;
}

info.req.user = session.user;
callback(true);
});
}
});

方案三:首条消息认证

连接后通过首条消息完成认证,适合复杂认证流程:

// 服务端
const pendingAuth = new Map();

wss.on('connection', (ws, req) => {
// 连接初始为未认证状态
ws.isAuthenticated = false;
ws.authTimeout = setTimeout(() => {
if (!ws.isAuthenticated) {
ws.close(1008, 'Authentication timeout');
}
}, 5000); // 5秒内必须完成认证

ws.on('message', (data) => {
// 未认证时只处理认证消息
if (!ws.isAuthenticated) {
try {
const msg = JSON.parse(data);
if (msg.type === 'auth') {
const user = verifyToken(msg.token);
if (user) {
ws.isAuthenticated = true;
ws.user = user;
clearTimeout(ws.authTimeout);
ws.send(JSON.stringify({ type: 'auth_success' }));
} else {
ws.close(1008, 'Authentication failed');
}
}
} catch (e) {
ws.close(1002, 'Invalid message');
}
return;
}

// 已认证,正常处理消息
handleMessage(ws, data);
});
});

// 客户端
const socket = new WebSocket('wss://api.example.com/ws');

socket.onopen = () => {
// 立即发送认证消息
socket.send(JSON.stringify({
type: 'auth',
token: localStorage.getItem('token')
}));
};

socket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'auth_success') {
console.log('认证成功');
// 开始正常通信
}
};

最佳实践总结

  1. 同域应用优先使用 Cookie:利用现有会话机制,最安全便捷
  2. 跨域应用使用 URL 参数 + 短期 Token:生成一次性连接 Token,使用后立即失效
  3. 复杂认证使用首条消息:支持 OAuth、多因素认证等复杂流程
  4. 始终设置认证超时:防止未认证连接占用资源

Origin 验证

跨站 WebSocket 劫持攻击(CSWSH)

CSWSH 是一种类似于 CSRF 的攻击方式。攻击者在其控制的网站上嵌入 JavaScript,尝试连接受害者的 WebSocket 服务器。由于浏览器会自动携带受害者的 Cookie,如果服务器不验证 Origin,攻击者就能以受害者身份建立连接。

攻击场景示例

1. 用户登录了 bank.example.com,Cookie 中包含会话凭证
2. 用户访问了攻击者的网站 evil.com
3. evil.com 的 JavaScript 尝试连接 wss://bank.example.com/ws
4. 浏览器自动携带 bank.example.com 的 Cookie
5. 如果服务器不验证 Origin,连接建立成功
6. 攻击者现在可以通过这个连接执行转账等操作

防护实现

const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];

const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const origin = info.origin;

// 注意:Origin 头部可能为空(非浏览器客户端)
// 对于浏览器客户端,origin 必须存在且在白名单中
if (!origin) {
// 非浏览器客户端,使用其他认证方式
// 例如检查 URL 参数中的 token
const token = new URL(info.req.url, 'http://localhost').searchParams.get('token');
if (validateServiceToken(token)) {
callback(true);
return;
}
callback(false, 401, 'Unauthorized');
return;
}

if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}

callback(true);
}
});

Origin 验证要点

  1. 严格匹配:使用完全匹配而非正则或部分匹配,避免 evil-example.com 绕过
  2. 区分环境:开发环境可以放宽限制,生产环境必须严格
  3. 考虑子域名:明确是否允许子域名访问
// 更严格的 Origin 验证
function isValidOrigin(origin) {
try {
const url = new URL(origin);
const allowedDomains = ['example.com', 'app.example.com'];

// 完全匹配域名
if (allowedDomains.includes(url.hostname)) {
return true;
}

// 如果需要允许所有子域名(谨慎使用)
// if (url.hostname.endsWith('.example.com')) {
// return true;
// }

return false;
} catch {
return false;
}
}

速率限制

class RateLimiter {
constructor(maxMessages = 100, windowMs = 60000) {
this.limits = new Map();
this.maxMessages = maxMessages;
this.windowMs = windowMs;
}

check(clientId) {
const now = Date.now();
let limit = this.limits.get(clientId);

if (!limit || now - limit.windowStart > this.windowMs) {
limit = { count: 0, windowStart: now };
this.limits.set(clientId, limit);
}

limit.count++;

return limit.count <= this.maxMessages;
}

remaining(clientId) {
const limit = this.limits.get(clientId);
if (!limit) return this.maxMessages;
return Math.max(0, this.maxMessages - limit.count);
}
}

const limiter = new RateLimiter(100, 60000);

ws.on('message', (data) => {
if (!limiter.check(ws.id)) {
ws.send(JSON.stringify({ error: 'Rate limit exceeded' }));
ws.close(1008, 'Rate limit exceeded');
return;
}

handleMessage(data);
});

消息验证

const Joi = require('joi');

const messageSchema = Joi.object({
type: Joi.string().valid('chat', 'join', 'leave').required(),
content: Joi.string().max(1000).when('type', {
is: 'chat',
then: Joi.required()
}),
room: Joi.string().max(50).when('type', {
is: 'join',
then: Joi.required()
})
});

function validateMessage(data) {
const { error, value } = messageSchema.validate(data);
if (error) {
throw new Error(`Validation error: ${error.message}`);
}
return value;
}

ws.on('message', (rawData) => {
try {
const data = JSON.parse(rawData);
const validatedData = validateMessage(data);
handleMessage(ws, validatedData);
} catch (error) {
ws.send(JSON.stringify({ error: error.message }));
}
});

性能优化

消息压缩

WebSocket 协议支持 permessage-deflate 扩展,使用 DEFLATE 算法压缩消息。压缩可以显著减少传输数据量,但需要权衡 CPU 开销和网络带宽收益。

压缩效果分析

数据类型原始大小压缩后大小压缩率
JSON 文本(重复键)10KB1-2KB80-90%
JSON 文本(唯一键)10KB3-5KB50-70%
纯文本内容10KB2-4KB60-80%
已压缩数据(图片、视频)10KB10-11KB无收益或负收益
二进制协议数据10KB6-8KB20-40%

Node.js ws 库配置

const WebSocket = require('ws');

const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3 // 压缩级别 1-9,1最快压缩率最低,9最慢压缩率最高
},
zlibInflateOptions: {
chunkSize: 10 * 1024 // 解压缓冲区大小
},
clientNoContextTakeover: true, // 客户端不保留压缩上下文
serverNoContextTakeover: true, // 服务端不保留压缩上下文
clientMaxWindowBits: 10, // 客户端滑动窗口大小
serverMaxWindowBits: 10 // 服务端滑动窗口大小
}
});

配置参数详解

  • level:压缩级别,1-9。推荐值 3-6,平衡压缩率和 CPU 开销。实时应用建议使用 1-3,批量传输可用 6-9。

  • contextTakeover:是否保留跨消息的压缩上下文。

    • true(默认):保留上下文,利用历史数据提高压缩率,但内存占用更高
    • false:每条消息独立压缩,内存占用低,但压缩率稍低
  • windowBits:滑动窗口大小,影响压缩率和内存。默认 15,减少到 10-12 可降低内存但略微降低压缩率。

何时启用压缩

适合压缩的场景

  • 大量重复结构的 JSON 消息(如聊天、状态更新)
  • 文本内容为主的消息流
  • 带宽受限的环境(移动网络、IoT 设备)
  • 服务端向客户端推送大量数据

不适合压缩的场景

  • 已压缩数据(图片、视频、压缩文件)
  • 极短消息(压缩后可能更大)
  • 服务端 CPU 已经是瓶颈
  • 低延迟要求的场景(压缩会增加处理延迟)

性能建议

// 条件性启用压缩
function shouldCompress(message) {
const size = Buffer.byteLength(message);

// 小于 100 字节不压缩,压缩收益低
if (size < 100) return false;

// 检测是否为已压缩数据
if (isCompressedData(message)) return false;

return true;
}

// 动态压缩配置
const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: false // 默认不启用
});

// 对特定连接启用压缩
wss.on('connection', (ws, req) => {
const needCompress = req.headers['x-enable-compress'] === 'true';
if (needCompress) {
// 为此连接启用压缩扩展
}
});

批量处理

class MessageBatcher {
constructor(sendCallback, options = {}) {
this.sendCallback = sendCallback;
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 100;

this.batch = [];
this.timer = null;
}

add(message) {
this.batch.push(message);

if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}

flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}

if (this.batch.length === 0) return;

const messages = this.batch;
this.batch = [];

this.sendCallback(messages);
}
}

const batcher = new MessageBatcher((messages) => {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ batch: messages }));
}
});
});

连接分组

class RoomManager {
constructor() {
this.rooms = new Map();
this.clientRooms = new Map();
}

join(roomId, clientId, connection) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Map());
}

this.rooms.get(roomId).set(clientId, connection);
this.clientRooms.set(clientId, roomId);
}

leave(clientId) {
const roomId = this.clientRooms.get(clientId);
if (roomId) {
this.rooms.get(roomId)?.delete(clientId);
this.clientRooms.delete(clientId);
}
}

broadcast(roomId, message, excludeClient = null) {
const room = this.rooms.get(roomId);
if (!room) return;

const data = JSON.stringify(message);

for (const [clientId, connection] of room) {
if (clientId !== excludeClient && connection.readyState === WebSocket.OPEN) {
connection.send(data);
}
}
}
}

监控与日志

生产环境的 WebSocket 服务需要完善的监控体系,以便及时发现和定位问题。监控应该覆盖连接状态、消息吞吐量、延迟等关键指标。

关键性能指标

核心监控指标

指标类型指标名称说明告警阈值建议
连接当前连接数活跃的 WebSocket 连接数超过容量 80%
连接连接建立速率每秒新增连接数根据业务峰值设置
连接连接断开率非正常关闭比例超过 5%
消息消息吞吐量每秒收发消息数根据业务设置
消息消息延迟从发送到接收的时间超过 100ms
消息消息大小平均消息大小异常增大时告警
资源内存使用进程内存占用超过 80%
资源CPU 使用进程 CPU 占用持续超过 70%
错误错误率错误事件比例超过 1%

连接监控

class WebSocketMonitor {
constructor() {
this.stats = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
errors: 0
};

this.startTime = Date.now();
}

onConnect() {
this.stats.totalConnections++;
this.stats.activeConnections++;
}

onDisconnect() {
this.stats.activeConnections--;
}

onMessageReceived() {
this.stats.messagesReceived++;
}

onMessageSent() {
this.stats.messagesSent++;
}

onError() {
this.stats.errors++;
}

getStats() {
return {
...this.stats,
uptime: Date.now() - this.startTime,
uptimeSeconds: Math.floor((Date.now() - this.startTime) / 1000)
};
}
}

const monitor = new WebSocketMonitor();

wss.on('connection', (ws) => {
monitor.onConnect();

ws.on('message', (data) => {
monitor.onMessageReceived();
});

ws.on('close', () => {
monitor.onDisconnect();
});

ws.on('error', () => {
monitor.onError();
});
});

setInterval(() => {
console.log('WebSocket 统计:', monitor.getStats());
}, 60000);

告警机制

基于监控数据实现告警:

class WebSocketAlerter {
constructor(monitor, options = {}) {
this.monitor = monitor;
this.thresholds = {
connectionCount: options.connectionCount || 8000, // 80% of 10000
errorRate: options.errorRate || 0.05, // 5%
messageLatency: options.messageLatency || 100, // 100ms
...options.thresholds
};
this.alertCooldown = new Map(); // 防止告警风暴
this.cooldownPeriod = options.cooldownPeriod || 300000; // 5分钟
}

check() {
const stats = this.monitor.getStats();

// 检查连接数
if (stats.activeConnections > this.thresholds.connectionCount) {
this.alert('high_connections',
`连接数过高: ${stats.activeConnections} / ${this.thresholds.connectionCount}`);
}

// 检查错误率
const errorRate = stats.errors / (stats.messagesReceived || 1);
if (errorRate > this.thresholds.errorRate) {
this.alert('high_error_rate',
`错误率过高: ${(errorRate * 100).toFixed(2)}%`);
}

// 检查平均延迟(如果有记录)
if (stats.avgLatency > this.thresholds.messageLatency) {
this.alert('high_latency',
`消息延迟过高: ${stats.avgLatency}ms`);
}
}

alert(type, message) {
const now = Date.now();
const lastAlert = this.alertCooldown.get(type);

// 检查冷却期
if (lastAlert && now - lastAlert < this.cooldownPeriod) {
return;
}

this.alertCooldown.set(type, now);

// 发送告警
console.error(`[ALERT][${type}] ${message}`);

// 可以集成到告警系统
// this.sendToSlack(type, message);
// this.sendToPagerDuty(type, message);
}
}

// 使用
const alerter = new WebSocketAlerter(monitor, {
connectionCount: 8000,
errorRate: 0.05,
messageLatency: 100
});

setInterval(() => alerter.check(), 60000);

性能数据采集

class PerformanceMonitor {
constructor() {
this.latencies = [];
this.maxSamples = 1000;
}

// 记录消息延迟
recordLatency(sendTime, receiveTime) {
const latency = receiveTime - sendTime;
this.latencies.push(latency);

if (this.latencies.length > this.maxSamples) {
this.latencies.shift();
}
}

// 计算延迟统计
getLatencyStats() {
if (this.latencies.length === 0) {
return { min: 0, max: 0, avg: 0, p50: 0, p95: 0, p99: 0 };
}

const sorted = [...this.latencies].sort((a, b) => a - b);
const sum = sorted.reduce((a, b) => a + b, 0);

return {
min: sorted[0],
max: sorted[sorted.length - 1],
avg: sum / sorted.length,
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)]
};
}
}

// 集成到消息处理中
const perfMonitor = new PerformanceMonitor();

ws.on('message', (data) => {
const msg = JSON.parse(data);

// 如果消息包含时间戳,计算延迟
if (msg.timestamp) {
perfMonitor.recordLatency(msg.timestamp, Date.now());
}

// 处理消息...
});

结构化日志

const winston = require('winston');

const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'websocket.log' })
]
});

function logConnection(ws, action, data = {}) {
logger.info({
type: 'websocket',
action: action,
clientId: ws.id,
ip: ws._socket?.remoteAddress,
...data
});
}

wss.on('connection', (ws, req) => {
ws.id = generateId();
logConnection(ws, 'connect');

ws.on('message', (data) => {
logConnection(ws, 'message', { size: data.length });
});

ws.on('close', (code, reason) => {
logConnection(ws, 'disconnect', { code, reason: reason.toString() });
});
});

小结

本章总结了 WebSocket 开发的最佳实践:

  1. 连接管理:连接池、状态追踪、优雅关闭
  2. 心跳机制:客户端和服务端心跳实现
  3. 错误处理:重连机制、错误恢复
  4. 安全考虑:认证、Origin 验证、速率限制、消息验证
  5. 性能优化:压缩、批量处理、连接分组
  6. 监控日志:统计信息、结构化日志

遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用。

消息格式设计

良好的消息格式设计对于 WebSocket 应用的可维护性和扩展性至关重要。

消息结构规范

推荐的消息结构

// 基本消息结构
{
"type": "message_type", // 必需:消息类型
"id": "unique_message_id", // 推荐:消息唯一标识
"timestamp": 1234567890, // 推荐:消息时间戳
"data": { // 消息负载
// 具体数据
}
}

完整示例

// 聊天消息
{
"type": "chat",
"id": "msg_abc123",
"timestamp": 1704067200000,
"data": {
"from": "user_001",
"to": "room_general",
"content": "大家好!"
}
}

// 系统通知
{
"type": "notification",
"id": "notif_xyz789",
"timestamp": 1704067201000,
"data": {
"level": "info",
"title": "新用户加入",
"content": "张三 加入了聊天室"
}
}

// 错误响应
{
"type": "error",
"id": "err_001",
"timestamp": 1704067202000,
"data": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "发送频率超限,请稍后重试",
"retryAfter": 60
}
}

类型定义规范

使用常量定义消息类型,避免硬编码字符串:

// message-types.js
export const MessageTypes = {
// 连接相关
CONNECT: 'connect',
DISCONNECT: 'disconnect',
HEARTBEAT: 'heartbeat',

// 聊天相关
CHAT: 'chat',
CHAT_TYPING: 'chat_typing',
CHAT_READ: 'chat_read',

// 系统相关
NOTIFICATION: 'notification',
ERROR: 'error',

// 业务相关
SUBSCRIBE: 'subscribe',
UNSUBSCRIBE: 'unsubscribe'
};

// 使用
socket.send(JSON.stringify({
type: MessageTypes.CHAT,
data: { content: '你好' }
}));

TypeScript 类型定义

对于 TypeScript 项目,定义完整的消息类型:

// types/websocket.ts
interface BaseMessage<T extends string, D> {
type: T;
id: string;
timestamp: number;
data: D;
}

interface ChatData {
from: string;
to: string;
content: string;
}

interface NotificationData {
level: 'info' | 'warning' | 'error';
title: string;
content: string;
}

interface ErrorData {
code: string;
message: string;
retryAfter?: number;
}

type ChatMessage = BaseMessage<'chat', ChatData>;
type NotificationMessage = BaseMessage<'notification', NotificationData>;
type ErrorMessage = BaseMessage<'error', ErrorData>;

type WebSocketMessage = ChatMessage | NotificationMessage | ErrorMessage;

// 类型守卫
function isChatMessage(msg: WebSocketMessage): msg is ChatMessage {
return msg.type === 'chat';
}

function isErrorMessage(msg: WebSocketMessage): msg is ErrorMessage {
return msg.type === 'error';
}

// 使用
socket.onmessage = (event) => {
const msg: WebSocketMessage = JSON.parse(event.data);

if (isChatMessage(msg)) {
handleChat(msg.data);
} else if (isErrorMessage(msg)) {
showError(msg.data.message);
}
};

消息确认机制

对于重要消息,实现确认机制确保消息送达:

class ReliableWebSocket {
constructor(url) {
this.url = url;
this.socket = null;
this.pendingMessages = new Map(); // 等待确认的消息
this.messageId = 0;
this.connect();
}

connect() {
this.socket = new WebSocket(this.url);

this.socket.onmessage = (event) => {
const msg = JSON.parse(event.data);

// 处理确认消息
if (msg.type === 'ack') {
this.pendingMessages.delete(msg.data.originalId);
return;
}

// 发送确认
if (msg.id) {
this.socket.send(JSON.stringify({
type: 'ack',
data: { originalId: msg.id }
}));
}

// 处理业务消息
this.handleMessage(msg);
};
}

// 发送需要确认的消息
sendReliable(data, timeout = 5000) {
const id = `msg_${++this.messageId}`;
const message = { ...data, id };

return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
this.pendingMessages.delete(id);
reject(new Error('消息确认超时'));
}, timeout);

this.pendingMessages.set(id, { message, resolve, reject, timer });

if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
});
}

// 重发未确认的消息
resendPending() {
this.pendingMessages.forEach(({ message }, id) => {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
});
}
}

客户端重连策略

健壮的重连策略是生产级 WebSocket 应用的基础。

指数退避算法

class ExponentialBackoff {
constructor(options = {}) {
this.baseDelay = options.baseDelay || 1000; // 初始延迟 1 秒
this.maxDelay = options.maxDelay || 30000; // 最大延迟 30 秒
this.jitter = options.jitter || true; // 是否添加随机抖动
this.multiplier = options.multiplier || 2; // 乘数
this.attempt = 0;
}

nextDelay() {
const delay = Math.min(
this.baseDelay * Math.pow(this.multiplier, this.attempt),
this.maxDelay
);

this.attempt++;

// 添加随机抖动,避免多个客户端同时重连
if (this.jitter) {
return delay + Math.random() * 1000;
}
return delay;
}

reset() {
this.attempt = 0;
}
}

// 使用
const backoff = new ExponentialBackoff();

socket.onclose = () => {
const delay = backoff.nextDelay();
console.log(`${delay / 1000} 秒后重连`);
setTimeout(reconnect, delay);
};

socket.onopen = () => {
backoff.reset(); // 连接成功后重置
};

完整的重连客户端

class RobustWebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: Infinity, // 最大重连次数
reconnectOnClose: true, // 关闭时是否重连
reconnectOnError: true, // 错误时是否重连
...options
};

this.socket = null;
this.reconnectAttempts = 0;
this.backoff = new ExponentialBackoff();
this.messageQueue = []; // 离线消息队列
this.eventHandlers = new Map(); // 事件处理器

this.connect();
}

connect() {
try {
this.socket = new WebSocket(this.url);
this.setupEventHandlers();
} catch (error) {
console.error('创建连接失败:', error);
this.scheduleReconnect();
}
}

setupEventHandlers() {
this.socket.onopen = (event) => {
console.log('连接成功');
this.reconnectAttempts = 0;
this.backoff.reset();
this.flushMessageQueue();
this.emit('open', event);
};

this.socket.onmessage = (event) => {
this.emit('message', event);
};

this.socket.onerror = (event) => {
console.error('连接错误');
this.emit('error', event);
};

this.socket.onclose = (event) => {
console.log('连接关闭:', event.code, event.reason);
this.emit('close', event);

// 根据关闭码决定是否重连
if (this.shouldReconnect(event.code)) {
this.scheduleReconnect();
}
};
}

shouldReconnect(code) {
// 1000: 正常关闭,不需要重连
// 1001: 端点离开,不需要重连
// 1008: 策略违规,可能需要重新认证
// 其他情况通常需要重连
if (code === 1000 || code === 1001) {
return false;
}
return this.reconnectAttempts < this.options.maxReconnectAttempts;
}

scheduleReconnect() {
const delay = this.backoff.nextDelay();
this.reconnectAttempts++;

console.log(`${this.reconnectAttempts} 次重连,${Math.round(delay / 1000)} 秒后执行`);

this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
}

send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(typeof data === 'string' ? data : JSON.stringify(data));
return true;
}

// 连接未打开,加入队列
this.messageQueue.push(data);
return false;
}

flushMessageQueue() {
while (this.messageQueue.length > 0 &&
this.socket.readyState === WebSocket.OPEN) {
const data = this.messageQueue.shift();
this.send(data);
}
}

on(event, handler) {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event).push(handler);
}

off(event, handler) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}

emit(event, data) {
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => handler(data));
}
}

close(code = 1000, reason = 'Normal closure') {
// 手动关闭时不再重连
this.options.reconnectOnClose = false;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.socket) {
this.socket.close(code, reason);
}
}
}

// 使用示例
const client = new RobustWebSocketClient('wss://example.com/ws', {
maxReconnectAttempts: 10
});

client.on('message', (event) => {
console.log('收到消息:', event.data);
});

client.on('open', () => {
console.log('连接已建立');
});

client.send({ type: 'hello' }); // 即使未连接也会排队等待

大规模部署考量

当 WebSocket 应用需要支持大量并发连接时,架构设计需要特别注意。

连接数估算

在规划服务器容量时,需要对并发连接数进行合理估算:

单台服务器理论最大连接数 = 文件描述符限制 - 系统保留

# Linux 默认文件描述符限制通常为 1024
# 需要调整系统限制:
ulimit -n 65535

# 每个连接占用内存估算:
# 读缓冲区 (默认 64KB) + 写缓冲区 (默认 64KB) + 用户数据
# 约 128KB-256KB 每连接

# 8GB 内存服务器理论连接上限:
# 8GB / 256KB ≈ 32000 连接
# 实际建议预留 50% 余量,约 15000-20000 连接

水平扩展策略

WebSocket 是有状态的,这与无状态的 HTTP 请求不同,需要特殊的扩展策略:

方案一:粘性会话(Sticky Sessions)

负载均衡器根据客户端 IP 或 Cookie 将请求路由到同一后端服务器。优点是简单易实现,缺点是可能导致负载不均衡。

# Nginx 粘性会话配置
upstream websocket {
ip_hash;
server 10.0.0.1:8080;
server 10.0.0.2:8080;
server 10.0.0.3:8080;
}

方案二:消息广播(Pub/Sub)

使用 Redis、Kafka 或专门的消息代理在服务器间同步消息。每台服务器订阅消息频道,收到消息后广播给本地连接。

// Node.js 使用 Redis 发布订阅
const redis = require('redis');
const publisher = redis.createClient();
const subscriber = redis.createClient();

subscriber.subscribe('chat:messages');

subscriber.on('message', (channel, message) => {
// 广播给本服务器的所有连接
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
});

// 收到客户端消息时发布到 Redis
ws.on('message', (data) => {
publisher.publish('chat:messages', data);
});

方案三:专用消息服务

对于超大规模应用,可以使用专门的消息服务如:

  • Redis Streams:支持消费者组和消息持久化
  • Apache Kafka:高吞吐量、持久化消息队列
  • NATS:轻量级消息系统
  • RabbitMQ:功能丰富的消息代理

连接迁移与优雅关闭

在进行服务更新或扩缩容时,需要实现连接的平滑迁移:

// 服务端优雅关闭示例
async function gracefulShutdown() {
console.log('开始优雅关闭...');

// 1. 停止接受新连接
wss.close();

// 2. 通知所有客户端即将关闭
const shutdownMessage = JSON.stringify({
type: 'shutdown',
reason: 'server_maintenance',
retryAfter: 30 // 秒
});

wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(shutdownMessage);
}
});

// 3. 等待客户端断开或超时
await new Promise(resolve => {
const checkInterval = setInterval(() => {
if (wss.clients.size === 0) {
clearInterval(checkInterval);
resolve();
}
}, 1000);

// 最多等待 30 秒
setTimeout(() => {
clearInterval(checkInterval);
resolve();
}, 30000);
});

// 4. 强制关闭剩余连接
wss.clients.forEach(client => {
client.terminate();
});

console.log('所有连接已关闭');
process.exit(0);
}

process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);

跨数据中心部署

对于全球用户,可能需要跨数据中心部署:

跨数据中心部署需要考虑:

  • 延迟:跨区域消息同步会增加延迟
  • 一致性:需要选择合适的一致性级别
  • 故障转移:单个数据中心故障时的处理策略

小结

本章全面总结了构建生产级 WebSocket 应用的最佳实践:

连接管理

  • 使用连接池有效管理大量并发连接
  • 实现连接状态追踪,记录连接的生命周期
  • 设计合理的连接超时和淘汰策略

心跳机制

  • 客户端实现应用层心跳检测连接状态
  • 服务端使用 Ping/Pong 帧检测断开的连接
  • 结合指数退避算法处理重连

错误处理

  • 客户端实现健壮的重连策略和消息队列
  • 服务端正确处理各类异常情况
  • 通过关闭码快速定位问题原因

安全考虑

  • 生产环境必须使用 WSS 加密传输
  • 实现 Origin 验证防止 CSWSH 攻击
  • 选择合适的认证方案(Cookie、URL 参数、首条消息认证)
  • 实现速率限制和消息验证

性能优化

  • 根据数据类型决定是否启用压缩
  • 批量处理消息减少系统调用
  • 合理分组连接提高广播效率

监控与日志

  • 监控关键指标(连接数、吞吐量、延迟、错误率)
  • 实现告警机制及时发现问题
  • 使用结构化日志便于排查问题

大规模部署

  • 使用 Redis 等消息代理实现跨服务器通信
  • 实现优雅关闭确保消息不丢失
  • 跨数据中心部署需要权衡延迟和一致性

遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用,满足生产环境的严苛要求。