跳到主要内容

最佳实践

构建生产级别的 WebSocket 应用需要考虑连接管理、错误处理、安全性和性能优化等多个方面。本章总结 WebSocket 开发的最佳实践。

连接管理

连接池管理

对于高并发场景,需要有效管理连接池:

class ConnectionPool {
constructor(maxConnections = 10000) {
this.connections = new Map();
this.maxConnections = maxConnections;
}

add(id, connection) {
if (this.connections.size >= this.maxConnections) {
this.evictOldest();
}
this.connections.set(id, {
connection,
lastActivity: Date.now(),
metadata: {}
});
}

evictOldest() {
let oldest = null;
let oldestTime = Infinity;

for (const [id, data] of this.connections) {
if (data.lastActivity < oldestTime) {
oldestTime = data.lastActivity;
oldest = id;
}
}

if (oldest) {
const conn = this.connections.get(oldest);
conn.connection.close(1001, '连接超时');
this.connections.delete(oldest);
}
}

updateActivity(id) {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = Date.now();
}
}
}

连接状态追踪

class ConnectionState {
constructor() {
this.state = 'disconnected';
this.retryCount = 0;
this.lastConnected = null;
this.lastDisconnected = null;
}

connect() {
this.state = 'connecting';
}

connected() {
this.state = 'connected';
this.retryCount = 0;
this.lastConnected = Date.now();
}

disconnect() {
this.state = 'disconnected';
this.lastDisconnected = Date.now();
}

shouldReconnect() {
if (this.state === 'connected') return false;
if (this.retryCount >= 5) return false;
return true;
}

getRetryDelay() {
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(baseDelay * Math.pow(2, this.retryCount), maxDelay);
return delay + Math.random() * 1000;
}
}

心跳机制

心跳机制对于检测断开的连接至关重要。

客户端心跳

class HeartbeatManager {
constructor(socket, options = {}) {
this.socket = socket;
this.interval = options.interval || 30000;
this.timeout = options.timeout || 5000;
this.maxMissed = options.maxMissed || 3;

this.missedHeartbeats = 0;
this.timer = null;
this.timeoutTimer = null;
}

start() {
this.stop();
this.timer = setInterval(() => this.sendPing(), this.interval);
}

stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}

sendPing() {
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}

this.missedHeartbeats++;

if (this.missedHeartbeats > this.maxMissed) {
this.stop();
this.socket.close(1001, '心跳超时');
return;
}

this.socket.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));

this.timeoutTimer = setTimeout(() => {
if (this.missedHeartbeats > 0) {
console.warn('心跳响应超时');
}
}, this.timeout);
}

onPong() {
this.missedHeartbeats = 0;
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
}

服务端心跳

function setupHeartbeat(wss, interval = 30000) {
const clients = new Map();

wss.on('connection', (ws) => {
clients.set(ws, { isAlive: true, lastPong: Date.now() });

ws.on('pong', () => {
const client = clients.get(ws);
if (client) {
client.isAlive = true;
client.lastPong = Date.now();
}
});

ws.on('close', () => {
clients.delete(ws);
});
});

const timer = setInterval(() => {
wss.clients.forEach((ws) => {
const client = clients.get(ws);

if (!client || !client.isAlive) {
ws.terminate();
clients.delete(ws);
return;
}

client.isAlive = false;
ws.ping();
});
}, interval);

wss.on('close', () => {
clearInterval(timer);
});
}

错误处理

客户端错误处理

class RobustWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
maxRetryDelay: 30000,
...options
};

this.socket = null;
this.retryCount = 0;
this.forcedClose = false;
this.messageQueue = [];

this.handlers = {
open: [],
message: [],
error: [],
close: []
};

this.connect();
}

connect() {
try {
this.socket = new WebSocket(this.url);

this.socket.onopen = (event) => {
this.retryCount = 0;
this.flushQueue();
this.emit('open', event);
};

this.socket.onmessage = (event) => {
this.emit('message', event);
};

this.socket.onerror = (event) => {
this.emit('error', event);
};

this.socket.onclose = (event) => {
this.emit('close', event);

if (!this.forcedClose && this.shouldReconnect()) {
this.scheduleReconnect();
}
};
} catch (error) {
this.emit('error', error);
if (this.shouldReconnect()) {
this.scheduleReconnect();
}
}
}

shouldReconnect() {
return this.retryCount < this.options.maxRetries;
}

scheduleReconnect() {
this.retryCount++;
const delay = Math.min(
this.options.retryDelay * Math.pow(2, this.retryCount - 1),
this.options.maxRetryDelay
);

setTimeout(() => this.connect(), delay);
}

send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
} else {
this.messageQueue.push(data);
}
}

flushQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
this.socket.send(data);
}
}

close(code = 1000, reason = 'Normal closure') {
this.forcedClose = true;
if (this.socket) {
this.socket.close(code, reason);
}
}

on(event, handler) {
if (this.handlers[event]) {
this.handlers[event].push(handler);
}
}

emit(event, data) {
this.handlers[event]?.forEach(handler => handler(data));
}
}

服务端错误处理

function handleConnection(ws, req) {
let isAuthenticated = false;

try {
isAuthenticated = authenticateConnection(req);
} catch (error) {
ws.close(1008, 'Authentication failed');
return;
}

if (!isAuthenticated) {
ws.close(1008, 'Unauthorized');
return;
}

ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(ws, message);
} catch (error) {
if (error instanceof SyntaxError) {
ws.send(JSON.stringify({ error: 'Invalid JSON' }));
} else {
console.error('处理消息错误:', error);
ws.send(JSON.stringify({ error: 'Internal error' }));
}
}
});

ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});

ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
cleanupConnection(ws);
});
}

安全考虑

认证机制

const jwt = require('jsonwebtoken');

function authenticateConnection(req) {
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');

if (!token) {
return false;
}

try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
return true;
} catch (error) {
return false;
}
}

const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const authenticated = authenticateConnection(info.req);
callback(authenticated, authenticated ? 200 : 401);
}
});

Origin 验证

const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];

const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const origin = info.origin;

if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}

callback(true);
}
});

速率限制

class RateLimiter {
constructor(maxMessages = 100, windowMs = 60000) {
this.limits = new Map();
this.maxMessages = maxMessages;
this.windowMs = windowMs;
}

check(clientId) {
const now = Date.now();
let limit = this.limits.get(clientId);

if (!limit || now - limit.windowStart > this.windowMs) {
limit = { count: 0, windowStart: now };
this.limits.set(clientId, limit);
}

limit.count++;

return limit.count <= this.maxMessages;
}

remaining(clientId) {
const limit = this.limits.get(clientId);
if (!limit) return this.maxMessages;
return Math.max(0, this.maxMessages - limit.count);
}
}

const limiter = new RateLimiter(100, 60000);

ws.on('message', (data) => {
if (!limiter.check(ws.id)) {
ws.send(JSON.stringify({ error: 'Rate limit exceeded' }));
ws.close(1008, 'Rate limit exceeded');
return;
}

handleMessage(data);
});

消息验证

const Joi = require('joi');

const messageSchema = Joi.object({
type: Joi.string().valid('chat', 'join', 'leave').required(),
content: Joi.string().max(1000).when('type', {
is: 'chat',
then: Joi.required()
}),
room: Joi.string().max(50).when('type', {
is: 'join',
then: Joi.required()
})
});

function validateMessage(data) {
const { error, value } = messageSchema.validate(data);
if (error) {
throw new Error(`Validation error: ${error.message}`);
}
return value;
}

ws.on('message', (rawData) => {
try {
const data = JSON.parse(rawData);
const validatedData = validateMessage(data);
handleMessage(ws, validatedData);
} catch (error) {
ws.send(JSON.stringify({ error: error.message }));
}
});

性能优化

消息压缩

const WebSocket = require('ws');

const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
clientNoContextTakeover: true,
serverNoContextTakeover: true
}
});

批量处理

class MessageBatcher {
constructor(sendCallback, options = {}) {
this.sendCallback = sendCallback;
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 100;

this.batch = [];
this.timer = null;
}

add(message) {
this.batch.push(message);

if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}

flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}

if (this.batch.length === 0) return;

const messages = this.batch;
this.batch = [];

this.sendCallback(messages);
}
}

const batcher = new MessageBatcher((messages) => {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ batch: messages }));
}
});
});

连接分组

class RoomManager {
constructor() {
this.rooms = new Map();
this.clientRooms = new Map();
}

join(roomId, clientId, connection) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Map());
}

this.rooms.get(roomId).set(clientId, connection);
this.clientRooms.set(clientId, roomId);
}

leave(clientId) {
const roomId = this.clientRooms.get(clientId);
if (roomId) {
this.rooms.get(roomId)?.delete(clientId);
this.clientRooms.delete(clientId);
}
}

broadcast(roomId, message, excludeClient = null) {
const room = this.rooms.get(roomId);
if (!room) return;

const data = JSON.stringify(message);

for (const [clientId, connection] of room) {
if (clientId !== excludeClient && connection.readyState === WebSocket.OPEN) {
connection.send(data);
}
}
}
}

监控与日志

连接监控

class WebSocketMonitor {
constructor() {
this.stats = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
errors: 0
};

this.startTime = Date.now();
}

onConnect() {
this.stats.totalConnections++;
this.stats.activeConnections++;
}

onDisconnect() {
this.stats.activeConnections--;
}

onMessageReceived() {
this.stats.messagesReceived++;
}

onMessageSent() {
this.stats.messagesSent++;
}

onError() {
this.stats.errors++;
}

getStats() {
return {
...this.stats,
uptime: Date.now() - this.startTime,
uptimeSeconds: Math.floor((Date.now() - this.startTime) / 1000)
};
}
}

const monitor = new WebSocketMonitor();

wss.on('connection', (ws) => {
monitor.onConnect();

ws.on('message', (data) => {
monitor.onMessageReceived();
});

ws.on('close', () => {
monitor.onDisconnect();
});

ws.on('error', () => {
monitor.onError();
});
});

setInterval(() => {
console.log('WebSocket 统计:', monitor.getStats());
}, 60000);

结构化日志

const winston = require('winston');

const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'websocket.log' })
]
});

function logConnection(ws, action, data = {}) {
logger.info({
type: 'websocket',
action: action,
clientId: ws.id,
ip: ws._socket?.remoteAddress,
...data
});
}

wss.on('connection', (ws, req) => {
ws.id = generateId();
logConnection(ws, 'connect');

ws.on('message', (data) => {
logConnection(ws, 'message', { size: data.length });
});

ws.on('close', (code, reason) => {
logConnection(ws, 'disconnect', { code, reason: reason.toString() });
});
});

小结

本章总结了 WebSocket 开发的最佳实践:

  1. 连接管理:连接池、状态追踪、优雅关闭
  2. 心跳机制:客户端和服务端心跳实现
  3. 错误处理:重连机制、错误恢复
  4. 安全考虑:认证、Origin 验证、速率限制、消息验证
  5. 性能优化:压缩、批量处理、连接分组
  6. 监控日志:统计信息、结构化日志

遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用。