跳到主要内容

Node.js 服务端实现

Node.js 是构建 WebSocket 服务器的热门选择,得益于其事件驱动的非阻塞 I/O 模型,非常适合处理大量并发连接。本章介绍两种主流的 Node.js WebSocket 库:ws 和 Socket.IO。

使用 ws 库

ws 是 Node.js 中最流行、性能最高的 WebSocket 库,它完全遵循 WebSocket 协议标准,API 简洁高效。

安装

npm install ws

基本服务器

创建一个简单的 WebSocket 服务器:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws, request) => {
console.log('新客户端连接');

ws.on('message', (data) => {
console.log('收到消息:', data.toString());
ws.send('服务器收到: ' + data);
});

ws.on('close', (code, reason) => {
console.log('客户端断开:', code, reason.toString());
});

ws.on('error', (error) => {
console.error('连接错误:', error);
});

ws.send('欢迎连接 WebSocket 服务器');
});

console.log('WebSocket 服务器运行在 ws://localhost:8080');

配置选项

WebSocket.Server 支持多种配置选项:

const wss = new WebSocket.Server({
port: 8080,
host: '0.0.0.0',
backlog: 511,
maxPayload: 100 * 1024 * 1024,
clientTracking: true,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
clientNoContextTakeover: true,
serverNoContextTakeover: true,
clientMaxWindowBits: 10,
serverMaxWindowBits: 10
}
});

主要配置说明

  • port/host:监听端口和地址
  • maxPayload:最大消息大小(字节)
  • clientTracking:是否跟踪客户端连接
  • perMessageDeflate:消息压缩配置

结合 HTTP 服务器

通常 WebSocket 服务器与 HTTP 服务器共享端口:

const http = require('http');
const WebSocket = require('ws');
const express = require('express');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

app.get('/', (req, res) => {
res.send('HTTP 服务');
});

wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
console.log('客户端 IP:', ip);

ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});

server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

获取客户端信息

wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
const port = request.socket.remotePort;
const headers = request.headers;
const url = request.url;
const cookies = request.headers.cookie;

console.log('客户端地址:', `${ip}:${port}`);
console.log('请求路径:', url);
console.log('Origin:', request.headers.origin);
});

验证客户端连接

使用 verifyClient 回调验证连接请求:

const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info, callback) => {
const origin = info.origin;
const allowedOrigins = ['http://localhost:3000', 'https://example.com'];

if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}

const token = new URL(info.req.url, 'http://localhost').searchParams.get('token');
if (!validateToken(token)) {
callback(false, 401, 'Unauthorized');
return;
}

callback(true);
}
});

function validateToken(token) {
return token === 'secret-token';
}

发送消息

ws 库支持发送多种类型的数据:

wss.on('connection', (ws) => {
ws.send('文本消息');

ws.send(Buffer.from('Buffer 数据'));

ws.send(JSON.stringify({ type: 'data', value: 123 }));

ws.send(new Uint8Array([1, 2, 3, 4]));

ws.send('二进制数据', { binary: true });

ws.send('压缩消息', { compress: true });
});

广播消息

向所有客户端广播消息:

function broadcast(wss, message) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}

wss.on('connection', (ws) => {
ws.on('message', (data) => {
broadcast(wss, data.toString());
});
});

排除发送者广播:

wss.on('connection', (ws) => {
ws.on('message', (data) => {
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
});

连接管理

实现连接管理器来跟踪所有连接:

class ConnectionManager {
constructor() {
this.connections = new Map();
}

add(id, ws) {
this.connections.set(id, ws);
console.log(`连接 ${id} 已添加,当前连接数: ${this.connections.size}`);
}

remove(id) {
this.connections.delete(id);
console.log(`连接 ${id} 已移除,当前连接数: ${this.connections.size}`);
}

get(id) {
return this.connections.get(id);
}

sendTo(id, message) {
const ws = this.connections.get(id);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
}

broadcast(message, excludeId = null) {
this.connections.forEach((ws, id) => {
if (id !== excludeId && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}

get size() {
return this.connections.size;
}
}

const manager = new ConnectionManager();
let connectionId = 0;

wss.on('connection', (ws) => {
const id = ++connectionId;
manager.add(id, ws);

ws.on('message', (data) => {
const message = JSON.parse(data);
manager.broadcast(JSON.stringify({
from: id,
...message
}), id);
});

ws.on('close', () => {
manager.remove(id);
});
});

心跳检测

实现服务器端心跳检测:

function heartbeat() {
this.isAlive = true;
}

wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', heartbeat);
});

const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}

ws.isAlive = false;
ws.ping();
});
}, 30000);

wss.on('close', () => {
clearInterval(interval);
});

聊天室示例

完整的聊天室实现:

const WebSocket = require('ws');

const wss = new WebSocket.Server({ port: 8080 });

class ChatRoom {
constructor(name) {
this.name = name;
this.clients = new Set();
this.history = [];
}

join(ws) {
this.clients.add(ws);
this.history.slice(-50).forEach(msg => {
ws.send(JSON.stringify(msg));
});
this.broadcast({
type: 'system',
content: `用户加入聊天室,当前 ${this.clients.size}`
}, ws);
}

leave(ws) {
this.clients.delete(ws);
this.broadcast({
type: 'system',
content: `用户离开聊天室,当前 ${this.clients.size}`
});
}

broadcast(message, exclude = null) {
const data = JSON.stringify({
...message,
timestamp: Date.now()
});

this.history.push(JSON.parse(data));
if (this.history.length > 100) {
this.history.shift();
}

this.clients.forEach(client => {
if (client !== exclude && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
}

const rooms = new Map();

function getRoom(name) {
if (!rooms.has(name)) {
rooms.set(name, new ChatRoom(name));
}
return rooms.get(name);
}

wss.on('connection', (ws, request) => {
const url = new URL(request.url, 'http://localhost');
const roomName = url.pathname.slice(1) || 'general';
const username = url.searchParams.get('username') || '匿名';

const room = getRoom(roomName);
ws.username = username;
ws.room = room;

room.join(ws);

ws.on('message', (data) => {
try {
const message = JSON.parse(data);
room.broadcast({
type: 'message',
username: ws.username,
content: message.content
});
} catch (e) {
ws.send(JSON.stringify({ type: 'error', message: '无效的消息格式' }));
}
});

ws.on('close', () => {
room.leave(ws);
});
});

console.log('聊天服务器运行在 ws://localhost:8080');

使用 Socket.IO

Socket.IO 是一个功能丰富的实时通信库,它在 WebSocket 基础上提供了更多功能,包括自动降级、房间管理、命名空间等。

安装

npm install socket.io

基本服务器

const http = require('http');
const { Server } = require('socket.io');

const server = http.createServer();
const io = new Server(server, {
cors: {
origin: 'http://localhost:3000',
methods: ['GET', 'POST']
}
});

io.on('connection', (socket) => {
console.log('用户连接:', socket.id);

socket.on('message', (data) => {
console.log('收到消息:', data);
socket.emit('response', '服务器收到: ' + data);
});

socket.on('disconnect', (reason) => {
console.log('用户断开:', socket.id, reason);
});
});

server.listen(8080, () => {
console.log('Socket.IO 服务器运行在端口 8080');
});

客户端使用

<script src="/socket.io/socket.io.js"></script>
<script>
const socket = io('http://localhost:8080');

socket.on('connect', () => {
console.log('已连接');
});

socket.emit('message', 'Hello');

socket.on('response', (data) => {
console.log('收到响应:', data);
});
</script>

房间管理

Socket.IO 内置房间功能:

io.on('connection', (socket) => {
socket.on('join-room', (roomName) => {
socket.join(roomName);
socket.emit('joined', roomName);
socket.to(roomName).emit('user-joined', socket.id);
});

socket.on('leave-room', (roomName) => {
socket.leave(roomName);
socket.to(roomName).emit('user-left', socket.id);
});

socket.on('room-message', (roomName, message) => {
io.to(roomName).emit('room-message', {
from: socket.id,
message: message
});
});

socket.on('disconnecting', () => {
const rooms = socket.rooms;
rooms.forEach(room => {
socket.to(room).emit('user-left', socket.id);
});
});
});

命名空间

使用命名空间分隔不同的通信通道:

const chatNamespace = io.of('/chat');
chatNamespace.on('connection', (socket) => {
console.log('聊天连接:', socket.id);

socket.on('message', (data) => {
chatNamespace.emit('message', data);
});
});

const notificationNamespace = io.of('/notifications');
notificationNamespace.on('connection', (socket) => {
console.log('通知连接:', socket.id);

socket.on('subscribe', (userId) => {
socket.join(`user-${userId}`);
});
});

function sendNotification(userId, notification) {
notificationNamespace.to(`user-${userId}`).emit('notification', notification);
}

中间件

使用中间件进行认证:

io.use((socket, next) => {
const token = socket.handshake.auth.token;

if (!token) {
return next(new Error('认证失败'));
}

try {
const user = verifyToken(token);
socket.user = user;
next();
} catch (err) {
next(new Error('无效的 token'));
}
});

io.on('connection', (socket) => {
console.log('用户已认证:', socket.user);
});

广播事件

io.on('connection', (socket) => {
socket.emit('welcome', '欢迎连接');

socket.broadcast.emit('user-connected', socket.id);

io.emit('announcement', '服务器公告');

io.to('room1').emit('room-message', '房间消息');

socket.to('room1').emit('room-message', '排除发送者');

io.except('room1').emit('broadcast', '排除特定房间');
});

Express 集成

ws 与 Express 集成

const express = require('express');
const http = require('http');
const WebSocket = require('ws');

const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

app.use(express.json());
app.use(express.static('public'));

app.get('/api/status', (req, res) => {
res.json({
connections: wss.clients.size,
status: 'running'
});
});

wss.on('connection', (ws, request) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});

server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

Socket.IO 与 Express 集成

const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: 'http://localhost:3000'
}
});

app.get('/', (req, res) => {
res.sendFile(__dirname + '/index.html');
});

app.get('/api/rooms', (req, res) => {
const rooms = io.sockets.adapter.rooms;
res.json(Array.from(rooms.keys()));
});

io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});

httpServer.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});

生产环境部署

使用 PM2 集群模式

const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} 退出`);
cluster.fork();
});
} else {
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});
}

使用 Redis 适配器

对于多进程/多服务器部署,使用 Redis 同步消息:

const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');

const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();

const io = new Server(8080, {
adapter: createAdapter(pubClient, subClient)
});

io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});

Nginx 配置

upstream websocket {
server 127.0.0.1:8080;
}

server {
listen 80;
server_name example.com;

location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 86400;
}
}

小结

本章介绍了在 Node.js 中实现 WebSocket 服务器的两种主要方式:

  1. ws 库:轻量级、高性能、完全遵循 WebSocket 标准
  2. Socket.IO:功能丰富、自动降级、内置房间和命名空间

选择建议:

  • 需要标准 WebSocket 协议、追求高性能:选择 ws
  • 需要自动降级、房间管理、命名空间:选择 Socket.IO

下一章将介绍 Python 服务端的 WebSocket 实现。