跳到主要内容

Node.js 服务端实现

Node.js 是构建 WebSocket 服务器的热门选择,得益于其事件驱动的非阻塞 I/O 模型,非常适合处理大量并发连接。本章介绍两种主流的 Node.js WebSocket 库:ws 和 Socket.IO。

使用 ws 库

ws 是 Node.js 中最流行、性能最高的 WebSocket 库,它完全遵循 WebSocket 协议标准,API 简洁高效。

安装

npm install ws

基本服务器

创建一个简单的 WebSocket 服务器:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws, request) => {
console.log('新客户端连接');

ws.on('message', (data) => {
console.log('收到消息:', data.toString());
ws.send('服务器收到: ' + data);
});

ws.on('close', (code, reason) => {
console.log('客户端断开:', code, reason.toString());
});

ws.on('error', (error) => {
console.error('连接错误:', error);
});

ws.send('欢迎连接 WebSocket 服务器');
});

console.log('WebSocket 服务器运行在 ws://localhost:8080');

配置选项

WebSocket.Server 支持多种配置选项:

const wss = new WebSocket.Server({
port: 8080,
host: '0.0.0.0',
backlog: 511,
maxPayload: 100 * 1024 * 1024,
clientTracking: true,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
clientNoContextTakeover: true,
serverNoContextTakeover: true,
clientMaxWindowBits: 10,
serverMaxWindowBits: 10
}
});

主要配置说明

  • port/host:监听端口和地址
  • maxPayload:最大消息大小(字节)
  • clientTracking:是否跟踪客户端连接
  • perMessageDeflate:消息压缩配置

结合 HTTP 服务器

通常 WebSocket 服务器与 HTTP 服务器共享端口:

const http = require('http');
const WebSocket = require('ws');
const express = require('express');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

app.get('/', (req, res) => {
res.send('HTTP 服务');
});

wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
console.log('客户端 IP:', ip);

ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});

server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

获取客户端信息

wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
const port = request.socket.remotePort;
const headers = request.headers;
const url = request.url;
const cookies = request.headers.cookie;

console.log('客户端地址:', `${ip}:${port}`);
console.log('请求路径:', url);
console.log('Origin:', request.headers.origin);
});

验证客户端连接

使用 verifyClient 回调验证连接请求:

const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info, callback) => {
const origin = info.origin;
const allowedOrigins = ['http://localhost:3000', 'https://example.com'];

if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}

const token = new URL(info.req.url, 'http://localhost').searchParams.get('token');
if (!validateToken(token)) {
callback(false, 401, 'Unauthorized');
return;
}

callback(true);
}
});

function validateToken(token) {
return token === 'secret-token';
}

发送消息

ws 库支持发送多种类型的数据:

wss.on('connection', (ws) => {
ws.send('文本消息');

ws.send(Buffer.from('Buffer 数据'));

ws.send(JSON.stringify({ type: 'data', value: 123 }));

ws.send(new Uint8Array([1, 2, 3, 4]));

ws.send('二进制数据', { binary: true });

ws.send('压缩消息', { compress: true });
});

广播消息

向所有客户端广播消息:

function broadcast(wss, message) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}

wss.on('connection', (ws) => {
ws.on('message', (data) => {
broadcast(wss, data.toString());
});
});

排除发送者广播:

wss.on('connection', (ws) => {
ws.on('message', (data) => {
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
});

连接管理

实现连接管理器来跟踪所有连接:

class ConnectionManager {
constructor() {
this.connections = new Map();
}

add(id, ws) {
this.connections.set(id, ws);
console.log(`连接 ${id} 已添加,当前连接数: ${this.connections.size}`);
}

remove(id) {
this.connections.delete(id);
console.log(`连接 ${id} 已移除,当前连接数: ${this.connections.size}`);
}

get(id) {
return this.connections.get(id);
}

sendTo(id, message) {
const ws = this.connections.get(id);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
}

broadcast(message, excludeId = null) {
this.connections.forEach((ws, id) => {
if (id !== excludeId && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}

get size() {
return this.connections.size;
}
}

const manager = new ConnectionManager();
let connectionId = 0;

wss.on('connection', (ws) => {
const id = ++connectionId;
manager.add(id, ws);

ws.on('message', (data) => {
const message = JSON.parse(data);
manager.broadcast(JSON.stringify({
from: id,
...message
}), id);
});

ws.on('close', () => {
manager.remove(id);
});
});

心跳检测

实现服务器端心跳检测:

function heartbeat() {
this.isAlive = true;
}

wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', heartbeat);
});

const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}

ws.isAlive = false;
ws.ping();
});
}, 30000);

wss.on('close', () => {
clearInterval(interval);
});

聊天室示例

完整的聊天室实现:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

class ChatRoom {
constructor(name) {
this.name = name;
this.clients = new Set();
this.history = [];
}

join(ws) {
this.clients.add(ws);
this.history.slice(-50).forEach(msg => {
ws.send(JSON.stringify(msg));
});
this.broadcast({
type: 'system',
content: `用户加入聊天室,当前 ${this.clients.size}`
}, ws);
}

leave(ws) {
this.clients.delete(ws);
this.broadcast({
type: 'system',
content: `用户离开聊天室,当前 ${this.clients.size}`
});
}

broadcast(message, exclude = null) {
const data = JSON.stringify({
...message,
timestamp: Date.now()
});

this.history.push(JSON.parse(data));
if (this.history.length > 100) {
this.history.shift();
}

this.clients.forEach(client => {
if (client !== exclude && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
}

const rooms = new Map();

function getRoom(name) {
if (!rooms.has(name)) {
rooms.set(name, new ChatRoom(name));
}
return rooms.get(name);
}

wss.on('connection', (ws, request) => {
const url = new URL(request.url, 'http://localhost');
const roomName = url.pathname.slice(1) || 'general';
const username = url.searchParams.get('username') || '匿名';

const room = getRoom(roomName);
ws.username = username;
ws.room = room;

room.join(ws);

ws.on('message', (data) => {
try {
const message = JSON.parse(data);
room.broadcast({
type: 'message',
username: ws.username,
content: message.content
});
} catch (e) {
ws.send(JSON.stringify({ type: 'error', message: '无效的消息格式' }));
}
});

ws.on('close', () => {
room.leave(ws);
});
});

console.log('聊天服务器运行在 ws://localhost:8080');

使用 Socket.IO

Socket.IO 是一个功能丰富的实时通信库,它在 WebSocket 基础上提供了更多功能,包括自动降级、房间管理、命名空间等。

安装

npm install socket.io

基本服务器

const http = require('http');
const { Server } = require('socket.io');

const server = http.createServer();
const io = new Server(server, {
cors: {
origin: 'http://localhost:3000',
methods: ['GET', 'POST']
}
});

io.on('connection', (socket) => {
console.log('用户连接:', socket.id);

socket.on('message', (data) => {
console.log('收到消息:', data);
socket.emit('response', '服务器收到: ' + data);
});

socket.on('disconnect', (reason) => {
console.log('用户断开:', socket.id, reason);
});
});

server.listen(8080, () => {
console.log('Socket.IO 服务器运行在端口 8080');
});

客户端使用

<script src="/socket.io/socket.io.js"></script>
<script>
const socket = io('http://localhost:8080');

socket.on('connect', () => {
console.log('已连接');
});

socket.emit('message', 'Hello');

socket.on('response', (data) => {
console.log('收到响应:', data);
});
</script>

房间管理

Socket.IO 内置房间功能:

io.on('connection', (socket) => {
socket.on('join-room', (roomName) => {
socket.join(roomName);
socket.emit('joined', roomName);
socket.to(roomName).emit('user-joined', socket.id);
});

socket.on('leave-room', (roomName) => {
socket.leave(roomName);
socket.to(roomName).emit('user-left', socket.id);
});

socket.on('room-message', (roomName, message) => {
io.to(roomName).emit('room-message', {
from: socket.id,
message: message
});
});

socket.on('disconnecting', () => {
const rooms = socket.rooms;
rooms.forEach(room => {
socket.to(room).emit('user-left', socket.id);
});
});
});

命名空间

使用命名空间分隔不同的通信通道:

const chatNamespace = io.of('/chat');
chatNamespace.on('connection', (socket) => {
console.log('聊天连接:', socket.id);

socket.on('message', (data) => {
chatNamespace.emit('message', data);
});
});

const notificationNamespace = io.of('/notifications');
notificationNamespace.on('connection', (socket) => {
console.log('通知连接:', socket.id);

socket.on('subscribe', (userId) => {
socket.join(`user-${userId}`);
});
});

function sendNotification(userId, notification) {
notificationNamespace.to(`user-${userId}`).emit('notification', notification);
}

中间件

使用中间件进行认证:

io.use((socket, next) => {
const token = socket.handshake.auth.token;

if (!token) {
return next(new Error('认证失败'));
}

try {
const user = verifyToken(token);
socket.user = user;
next();
} catch (err) {
next(new Error('无效的 token'));
}
});

io.on('connection', (socket) => {
console.log('用户已认证:', socket.user);
});

广播事件

io.on('connection', (socket) => {
socket.emit('welcome', '欢迎连接');

socket.broadcast.emit('user-connected', socket.id);

io.emit('announcement', '服务器公告');

io.to('room1').emit('room-message', '房间消息');

socket.to('room1').emit('room-message', '排除发送者');

io.except('room1').emit('broadcast', '排除特定房间');
});

Express 集成

ws 与 Express 集成

const express = require('express');
const http = require('http');
const WebSocket = require('ws');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

app.use(express.json());
app.use(express.static('public'));

app.get('/api/status', (req, res) => {
res.json({
connections: wss.clients.size,
status: 'running'
});
});

wss.on('connection', (ws, request) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});

server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

Socket.IO 与 Express 集成

const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: 'http://localhost:3000'
}
});

app.get('/', (req, res) => {
res.sendFile(__dirname + '/index.html');
});

app.get('/api/rooms', (req, res) => {
const rooms = io.sockets.adapter.rooms;
res.json(Array.from(rooms.keys()));
});

io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});

httpServer.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

生产环境部署

使用 PM2 集群模式

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} 退出`);
cluster.fork();
});
} else {
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});
}

使用 Redis 适配器

对于多进程/多服务器部署,使用 Redis 同步消息:

const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();

const io = new Server(8080, {
adapter: createAdapter(pubClient, subClient)
});

io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});

Nginx 配置

upstream websocket {
server 127.0.0.1:8080;
}

server {
listen 80;
server_name example.com;

location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 86400;
}
}

高级功能

noServer 模式

noServer 模式允许 WebSocket 服务器完全独立于 HTTP 服务器,适合需要在同一个 HTTP 服务器上处理多个 WebSocket 路径或需要自定义升级逻辑的场景:

const http = require('http');
const WebSocket = require('ws');

// 创建不绑定端口的 WebSocket 服务器
const wss1 = new WebSocket.Server({ noServer: true });
const wss2 = new WebSocket.Server({ noServer: true });

// 创建 HTTP 服务器
const server = http.createServer();

// 监听 upgrade 事件,手动处理升级请求
server.on('upgrade', (request, socket, head) => {
const pathname = new URL(request.url, `http://${request.headers.host}`).pathname;

if (pathname === '/chat') {
// 将请求转发给聊天 WebSocket 服务器
wss1.handleUpgrade(request, socket, head, (ws) => {
wss1.emit('connection', ws, request);
});
} else if (pathname === '/notifications') {
// 将请求转发给通知 WebSocket 服务器
wss2.handleUpgrade(request, socket, head, (ws) => {
wss2.emit('connection', ws, request);
});
} else {
// 不支持的路径,销毁连接
socket.destroy();
}
});

// 聊天服务器处理逻辑
wss1.on('connection', (ws) => {
console.log('聊天连接建立');
ws.on('message', (data) => {
wss1.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
});

// 通知服务器处理逻辑
wss2.on('connection', (ws) => {
console.log('通知连接建立');
});

server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
console.log('WebSocket 端点: ws://localhost:8080/chat, ws://localhost:8080/notifications');
});

noServer 模式的优势

  • 单一 HTTP 服务器可处理多个 WebSocket 端点
  • 可以在升级前执行自定义认证逻辑
  • 可以与 Express、Fastify 等框架更好地集成

handleProtocols 子协议协商

ws 库支持通过 handleProtocols 选项自定义子协议选择逻辑:

const wss = new WebSocket.Server({
port: 8080,
handleProtocols: (protocols, request) => {
// protocols 是一个 Set,包含客户端请求的所有子协议
console.log('客户端请求的协议:', Array.from(protocols));

// 按优先级选择协议
if (protocols.has('v2.chat.example.com')) {
return 'v2.chat.example.com';
}
if (protocols.has('v1.chat.example.com')) {
return 'v1.chat.example.com';
}

// 没有匹配的协议,返回 false 拒绝连接
// 或者返回一个默认协议
return false;
}
});

wss.on('connection', (ws, request) => {
// 检查最终选择的协议
console.log('选定协议:', ws.protocol);

// 根据协议版本使用不同的处理逻辑
if (ws.protocol === 'v2.chat.example.com') {
handleV2Protocol(ws);
} else {
handleV1Protocol(ws);
}
});

流式处理(createWebSocketStream)

ws 库提供了 createWebSocketStream 函数,将 WebSocket 转换为 Node.js 的 Duplex 流,可以与流处理 API 配合使用:

const WebSocket = require('ws');
const { createWebSocketStream } = require('ws');
const fs = require('fs');

// 服务端:接收文件流
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
// 将 WebSocket 转换为 Duplex 流
const stream = createWebSocketStream(ws, { encoding: 'utf8' });

// 将接收到的数据写入文件
const writeStream = fs.createWriteStream('./received.txt');
stream.pipe(writeStream);

stream.on('error', (error) => {
console.error('流错误:', error);
});
});

// 客户端:发送文件流
const ws = new WebSocket('ws://localhost:8080');
const stream = createWebSocketStream(ws);

// 从文件读取并通过 WebSocket 发送
const readStream = fs.createReadStream('./large-file.txt');
readStream.pipe(stream);

readStream.on('end', () => {
console.log('文件发送完成');
ws.close();
});

流式处理的优势

  • 背压支持:自动处理数据生产和消费的不平衡
  • 内存效率:无需将整个文件加载到内存
  • 可组合性:可以与 Node.js 流生态(压缩、加密等)无缝集成

错误码处理

ws 库定义了特定的错误码,帮助诊断协议层面的问题:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
ws.on('error', (error) => {
switch (error.code) {
case 'WS_ERR_EXPECTED_FIN':
console.error('帧格式错误:缺少 FIN 位');
break;
case 'WS_ERR_EXPECTED_MASK':
console.error('客户端帧必须掩码');
break;
case 'WS_ERR_INVALID_CLOSE_CODE':
console.error('无效的关闭码');
break;
case 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH':
console.error('控制帧负载长度无效(最大 125 字节)');
break;
case 'WS_ERR_INVALID_OPCODE':
console.error('无效的操作码');
break;
case 'WS_ERR_INVALID_UTF8':
console.error('无效的 UTF-8 数据');
break;
case 'WS_ERR_UNEXPECTED_MASK':
console.error('服务器帧不应掩码');
break;
case 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH':
console.error('消息长度超过限制');
break;
default:
console.error('未知错误:', error.message);
}
});
});

headers 事件

在响应头发送前进行自定义修改:

const wss = new WebSocket.Server({
port: 8080,
headers: ['X-Custom-Header'] // 预定义要添加的头
});

wss.on('headers', (headers, request) => {
// 在响应头发送前可以修改
headers.push('X-Server-Time: ' + Date.now());
headers.push('X-Request-ID: ' + generateRequestId());

// 可以根据请求动态设置
const origin = request.headers.origin;
if (origin === 'https://trusted.example.com') {
headers.push('X-Trust-Level: high');
}
});

小结

本章介绍了在 Node.js 中实现 WebSocket 服务器的两种主要方式:

  1. ws 库:轻量级、高性能、完全遵循 WebSocket 标准
  2. Socket.IO:功能丰富、自动降级、内置房间和命名空间

选择建议:

  • 需要标准 WebSocket 协议、追求高性能:选择 ws
  • 需要自动降级、房间管理、命名空间:选择 Socket.IO

下一章将介绍 Python 服务端的 WebSocket 实现。