最佳实践
构建生产级别的 WebSocket 应用需要考虑连接管理、错误处理、安全性和性能优化等多个方面。本章总结 WebSocket 开发的最佳实践。
连接管理
连接池管理
对于高并发场景,需要有效管理连接池:
class ConnectionPool {
constructor(maxConnections = 10000) {
this.connections = new Map();
this.maxConnections = maxConnections;
}
add(id, connection) {
if (this.connections.size >= this.maxConnections) {
this.evictOldest();
}
this.connections.set(id, {
connection,
lastActivity: Date.now(),
metadata: {}
});
}
evictOldest() {
let oldest = null;
let oldestTime = Infinity;
for (const [id, data] of this.connections) {
if (data.lastActivity < oldestTime) {
oldestTime = data.lastActivity;
oldest = id;
}
}
if (oldest) {
const conn = this.connections.get(oldest);
conn.connection.close(1001, '连接超时');
this.connections.delete(oldest);
}
}
updateActivity(id) {
const conn = this.connections.get(id);
if (conn) {
conn.lastActivity = Date.now();
}
}
}
连接状态追踪
class ConnectionState {
constructor() {
this.state = 'disconnected';
this.retryCount = 0;
this.lastConnected = null;
this.lastDisconnected = null;
}
connect() {
this.state = 'connecting';
}
connected() {
this.state = 'connected';
this.retryCount = 0;
this.lastConnected = Date.now();
}
disconnect() {
this.state = 'disconnected';
this.lastDisconnected = Date.now();
}
shouldReconnect() {
if (this.state === 'connected') return false;
if (this.retryCount >= 5) return false;
return true;
}
getRetryDelay() {
const baseDelay = 1000;
const maxDelay = 30000;
const delay = Math.min(baseDelay * Math.pow(2, this.retryCount), maxDelay);
return delay + Math.random() * 1000;
}
}
心跳机制
心跳机制对于检测断开的连接至关重要。
客户端心跳
class HeartbeatManager {
constructor(socket, options = {}) {
this.socket = socket;
this.interval = options.interval || 30000;
this.timeout = options.timeout || 5000;
this.maxMissed = options.maxMissed || 3;
this.missedHeartbeats = 0;
this.timer = null;
this.timeoutTimer = null;
}
start() {
this.stop();
this.timer = setInterval(() => this.sendPing(), this.interval);
}
stop() {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
sendPing() {
if (this.socket.readyState !== WebSocket.OPEN) {
return;
}
this.missedHeartbeats++;
if (this.missedHeartbeats > this.maxMissed) {
this.stop();
this.socket.close(1001, '心跳超时');
return;
}
this.socket.send(JSON.stringify({ type: 'ping', timestamp: Date.now() }));
this.timeoutTimer = setTimeout(() => {
if (this.missedHeartbeats > 0) {
console.warn('心跳响应超时');
}
}, this.timeout);
}
onPong() {
this.missedHeartbeats = 0;
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}
}
服务端心跳
function setupHeartbeat(wss, interval = 30000) {
const clients = new Map();
wss.on('connection', (ws) => {
clients.set(ws, { isAlive: true, lastPong: Date.now() });
ws.on('pong', () => {
const client = clients.get(ws);
if (client) {
client.isAlive = true;
client.lastPong = Date.now();
}
});
ws.on('close', () => {
clients.delete(ws);
});
});
const timer = setInterval(() => {
wss.clients.forEach((ws) => {
const client = clients.get(ws);
if (!client || !client.isAlive) {
ws.terminate();
clients.delete(ws);
return;
}
client.isAlive = false;
ws.ping();
});
}, interval);
wss.on('close', () => {
clearInterval(timer);
});
}
错误处理
客户端错误处理
class RobustWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxRetries: 5,
retryDelay: 1000,
maxRetryDelay: 30000,
...options
};
this.socket = null;
this.retryCount = 0;
this.forcedClose = false;
this.messageQueue = [];
this.handlers = {
open: [],
message: [],
error: [],
close: []
};
this.connect();
}
connect() {
try {
this.socket = new WebSocket(this.url);
this.socket.onopen = (event) => {
this.retryCount = 0;
this.flushQueue();
this.emit('open', event);
};
this.socket.onmessage = (event) => {
this.emit('message', event);
};
this.socket.onerror = (event) => {
this.emit('error', event);
};
this.socket.onclose = (event) => {
this.emit('close', event);
if (!this.forcedClose && this.shouldReconnect()) {
this.scheduleReconnect();
}
};
} catch (error) {
this.emit('error', error);
if (this.shouldReconnect()) {
this.scheduleReconnect();
}
}
}
shouldReconnect() {
return this.retryCount < this.options.maxRetries;
}
scheduleReconnect() {
this.retryCount++;
const delay = Math.min(
this.options.retryDelay * Math.pow(2, this.retryCount - 1),
this.options.maxRetryDelay
);
setTimeout(() => this.connect(), delay);
}
send(data) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(data);
} else {
this.messageQueue.push(data);
}
}
flushQueue() {
while (this.messageQueue.length > 0) {
const data = this.messageQueue.shift();
this.socket.send(data);
}
}
close(code = 1000, reason = 'Normal closure') {
this.forcedClose = true;
if (this.socket) {
this.socket.close(code, reason);
}
}
on(event, handler) {
if (this.handlers[event]) {
this.handlers[event].push(handler);
}
}
emit(event, data) {
this.handlers[event]?.forEach(handler => handler(data));
}
}
服务端错误处理
function handleConnection(ws, req) {
let isAuthenticated = false;
try {
isAuthenticated = authenticateConnection(req);
} catch (error) {
ws.close(1008, 'Authentication failed');
return;
}
if (!isAuthenticated) {
ws.close(1008, 'Unauthorized');
return;
}
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(ws, message);
} catch (error) {
if (error instanceof SyntaxError) {
ws.send(JSON.stringify({ error: 'Invalid JSON' }));
} else {
console.error('处理消息错误:', error);
ws.send(JSON.stringify({ error: 'Internal error' }));
}
}
});
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
ws.on('close', (code, reason) => {
console.log(`连接关闭: ${code} - ${reason}`);
cleanupConnection(ws);
});
}
安全考虑
认证机制
const jwt = require('jsonwebtoken');
function authenticateConnection(req) {
const url = new URL(req.url, `http://${req.headers.host}`);
const token = url.searchParams.get('token');
if (!token) {
return false;
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
return true;
} catch (error) {
return false;
}
}
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const authenticated = authenticateConnection(info.req);
callback(authenticated, authenticated ? 200 : 401);
}
});
Origin 验证
const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];
const wss = new WebSocket.Server({
server,
verifyClient: (info, callback) => {
const origin = info.origin;
if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}
callback(true);
}
});
速率限制
class RateLimiter {
constructor(maxMessages = 100, windowMs = 60000) {
this.limits = new Map();
this.maxMessages = maxMessages;
this.windowMs = windowMs;
}
check(clientId) {
const now = Date.now();
let limit = this.limits.get(clientId);
if (!limit || now - limit.windowStart > this.windowMs) {
limit = { count: 0, windowStart: now };
this.limits.set(clientId, limit);
}
limit.count++;
return limit.count <= this.maxMessages;
}
remaining(clientId) {
const limit = this.limits.get(clientId);
if (!limit) return this.maxMessages;
return Math.max(0, this.maxMessages - limit.count);
}
}
const limiter = new RateLimiter(100, 60000);
ws.on('message', (data) => {
if (!limiter.check(ws.id)) {
ws.send(JSON.stringify({ error: 'Rate limit exceeded' }));
ws.close(1008, 'Rate limit exceeded');
return;
}
handleMessage(data);
});
消息验证
const Joi = require('joi');
const messageSchema = Joi.object({
type: Joi.string().valid('chat', 'join', 'leave').required(),
content: Joi.string().max(1000).when('type', {
is: 'chat',
then: Joi.required()
}),
room: Joi.string().max(50).when('type', {
is: 'join',
then: Joi.required()
})
});
function validateMessage(data) {
const { error, value } = messageSchema.validate(data);
if (error) {
throw new Error(`Validation error: ${error.message}`);
}
return value;
}
ws.on('message', (rawData) => {
try {
const data = JSON.parse(rawData);
const validatedData = validateMessage(data);
handleMessage(ws, validatedData);
} catch (error) {
ws.send(JSON.stringify({ error: error.message }));
}
});
性能优化
消息压缩
const WebSocket = require('ws');
const wss = new WebSocket.Server({
port: 8080,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
clientNoContextTakeover: true,
serverNoContextTakeover: true
}
});
批量处理
class MessageBatcher {
constructor(sendCallback, options = {}) {
this.sendCallback = sendCallback;
this.batchSize = options.batchSize || 100;
this.flushInterval = options.flushInterval || 100;
this.batch = [];
this.timer = null;
}
add(message) {
this.batch.push(message);
if (this.batch.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.flushInterval);
}
}
flush() {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
if (this.batch.length === 0) return;
const messages = this.batch;
this.batch = [];
this.sendCallback(messages);
}
}
const batcher = new MessageBatcher((messages) => {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ batch: messages }));
}
});
});
连接分组
class RoomManager {
constructor() {
this.rooms = new Map();
this.clientRooms = new Map();
}
join(roomId, clientId, connection) {
if (!this.rooms.has(roomId)) {
this.rooms.set(roomId, new Map());
}
this.rooms.get(roomId).set(clientId, connection);
this.clientRooms.set(clientId, roomId);
}
leave(clientId) {
const roomId = this.clientRooms.get(clientId);
if (roomId) {
this.rooms.get(roomId)?.delete(clientId);
this.clientRooms.delete(clientId);
}
}
broadcast(roomId, message, excludeClient = null) {
const room = this.rooms.get(roomId);
if (!room) return;
const data = JSON.stringify(message);
for (const [clientId, connection] of room) {
if (clientId !== excludeClient && connection.readyState === WebSocket.OPEN) {
connection.send(data);
}
}
}
}
监控与日志
连接监控
class WebSocketMonitor {
constructor() {
this.stats = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
errors: 0
};
this.startTime = Date.now();
}
onConnect() {
this.stats.totalConnections++;
this.stats.activeConnections++;
}
onDisconnect() {
this.stats.activeConnections--;
}
onMessageReceived() {
this.stats.messagesReceived++;
}
onMessageSent() {
this.stats.messagesSent++;
}
onError() {
this.stats.errors++;
}
getStats() {
return {
...this.stats,
uptime: Date.now() - this.startTime,
uptimeSeconds: Math.floor((Date.now() - this.startTime) / 1000)
};
}
}
const monitor = new WebSocketMonitor();
wss.on('connection', (ws) => {
monitor.onConnect();
ws.on('message', (data) => {
monitor.onMessageReceived();
});
ws.on('close', () => {
monitor.onDisconnect();
});
ws.on('error', () => {
monitor.onError();
});
});
setInterval(() => {
console.log('WebSocket 统计:', monitor.getStats());
}, 60000);
结构化日志
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.File({ filename: 'websocket.log' })
]
});
function logConnection(ws, action, data = {}) {
logger.info({
type: 'websocket',
action: action,
clientId: ws.id,
ip: ws._socket?.remoteAddress,
...data
});
}
wss.on('connection', (ws, req) => {
ws.id = generateId();
logConnection(ws, 'connect');
ws.on('message', (data) => {
logConnection(ws, 'message', { size: data.length });
});
ws.on('close', (code, reason) => {
logConnection(ws, 'disconnect', { code, reason: reason.toString() });
});
});
小结
本章总结了 WebSocket 开发的最佳实践:
- 连接管理:连接池、状态追踪、优雅关闭
- 心跳机制:客户端和服务端心跳实现
- 错误处理:重连机制、错误恢复
- 安全考虑:认证、Origin 验证、速率限制、消息验证
- 性能优化:压缩、批量处理、连接分组
- 监控日志:统计信息、结构化日志
遵循这些最佳实践,可以构建稳定、安全、高性能的 WebSocket 应用。