Node.js 服务端实现
Node.js 是构建 WebSocket 服务器的热门选择,得益于其事件驱动的非阻塞 I/O 模型,非常适合处理大量并发连接。本章介绍两种主流的 Node.js WebSocket 库:ws 和 Socket.IO。
使用 ws 库
ws 是 Node.js 中最流行、性能最高的 WebSocket 库,它完全遵循 WebSocket 协议标准,API 简洁高效。
安装
npm install ws
基本服务器
创建一个简单的 WebSocket 服务器:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws, request) => {
console.log('新客户端连接');
ws.on('message', (data) => {
console.log('收到消息:', data.toString());
ws.send('服务器收到: ' + data);
});
ws.on('close', (code, reason) => {
console.log('客户端断开:', code, reason.toString());
});
ws.on('error', (error) => {
console.error('连接错误:', error);
});
ws.send('欢迎连接 WebSocket 服务器');
});
console.log('WebSocket 服务器运行在 ws://localhost:8080');
配置选项
WebSocket.Server 支持多种配置选项:
const wss = new WebSocket.Server({
port: 8080,
host: '0.0.0.0',
backlog: 511,
maxPayload: 100 * 1024 * 1024,
clientTracking: true,
perMessageDeflate: {
zlibDeflateOptions: {
level: 3
},
zlibInflateOptions: {
chunkSize: 10 * 1024
},
clientNoContextTakeover: true,
serverNoContextTakeover: true,
clientMaxWindowBits: 10,
serverMaxWindowBits: 10
}
});
主要配置说明:
- port/host:监听端口和地址
- maxPayload:最大消息大小(字节)
- clientTracking:是否跟踪客户端连接
- perMessageDeflate:消息压缩配置
结合 HTTP 服务器
通常 WebSocket 服务器与 HTTP 服务器共享端口:
const http = require('http');
const WebSocket = require('ws');
const express = require('express');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
app.get('/', (req, res) => {
res.send('HTTP 服务');
});
wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
console.log('客户端 IP:', ip);
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});
server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});
获取客户端信息
wss.on('connection', (ws, request) => {
const ip = request.socket.remoteAddress;
const port = request.socket.remotePort;
const headers = request.headers;
const url = request.url;
const cookies = request.headers.cookie;
console.log('客户端地址:', `${ip}:${port}`);
console.log('请求路径:', url);
console.log('Origin:', request.headers.origin);
});
验证客户端连接
使用 verifyClient 回调验证连接请求:
const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info, callback) => {
const origin = info.origin;
const allowedOrigins = ['http://localhost:3000', 'https://example.com'];
if (!allowedOrigins.includes(origin)) {
callback(false, 403, 'Forbidden');
return;
}
const token = new URL(info.req.url, 'http://localhost').searchParams.get('token');
if (!validateToken(token)) {
callback(false, 401, 'Unauthorized');
return;
}
callback(true);
}
});
function validateToken(token) {
return token === 'secret-token';
}
发送消息
ws 库支持发送多种类型的数据:
wss.on('connection', (ws) => {
ws.send('文本消息');
ws.send(Buffer.from('Buffer 数据'));
ws.send(JSON.stringify({ type: 'data', value: 123 }));
ws.send(new Uint8Array([1, 2, 3, 4]));
ws.send('二进制数据', { binary: true });
ws.send('压缩消息', { compress: true });
});
广播消息
向所有客户端广播消息:
function broadcast(wss, message) {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
wss.on('connection', (ws) => {
ws.on('message', (data) => {
broadcast(wss, data.toString());
});
});
排除发送者广播:
wss.on('connection', (ws) => {
ws.on('message', (data) => {
wss.clients.forEach((client) => {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
});
连接管理
实现连接管理器来跟踪所有连接:
class ConnectionManager {
constructor() {
this.connections = new Map();
}
add(id, ws) {
this.connections.set(id, ws);
console.log(`连接 ${id} 已添加,当前连接数: ${this.connections.size}`);
}
remove(id) {
this.connections.delete(id);
console.log(`连接 ${id} 已移除,当前连接数: ${this.connections.size}`);
}
get(id) {
return this.connections.get(id);
}
sendTo(id, message) {
const ws = this.connections.get(id);
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(message);
return true;
}
return false;
}
broadcast(message, excludeId = null) {
this.connections.forEach((ws, id) => {
if (id !== excludeId && ws.readyState === WebSocket.OPEN) {
ws.send(message);
}
});
}
get size() {
return this.connections.size;
}
}
const manager = new ConnectionManager();
let connectionId = 0;
wss.on('connection', (ws) => {
const id = ++connectionId;
manager.add(id, ws);
ws.on('message', (data) => {
const message = JSON.parse(data);
manager.broadcast(JSON.stringify({
from: id,
...message
}), id);
});
ws.on('close', () => {
manager.remove(id);
});
});
心跳检测
实现服务器端心跳检测:
function heartbeat() {
this.isAlive = true;
}
wss.on('connection', (ws) => {
ws.isAlive = true;
ws.on('pong', heartbeat);
});
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
wss.on('close', () => {
clearInterval(interval);
});
聊天室示例
完整的聊天室实现:
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
class ChatRoom {
constructor(name) {
this.name = name;
this.clients = new Set();
this.history = [];
}
join(ws) {
this.clients.add(ws);
this.history.slice(-50).forEach(msg => {
ws.send(JSON.stringify(msg));
});
this.broadcast({
type: 'system',
content: `用户加入聊天室,当前 ${this.clients.size} 人`
}, ws);
}
leave(ws) {
this.clients.delete(ws);
this.broadcast({
type: 'system',
content: `用户离开聊天室,当前 ${this.clients.size} 人`
});
}
broadcast(message, exclude = null) {
const data = JSON.stringify({
...message,
timestamp: Date.now()
});
this.history.push(JSON.parse(data));
if (this.history.length > 100) {
this.history.shift();
}
this.clients.forEach(client => {
if (client !== exclude && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
}
const rooms = new Map();
function getRoom(name) {
if (!rooms.has(name)) {
rooms.set(name, new ChatRoom(name));
}
return rooms.get(name);
}
wss.on('connection', (ws, request) => {
const url = new URL(request.url, 'http://localhost');
const roomName = url.pathname.slice(1) || 'general';
const username = url.searchParams.get('username') || '匿名';
const room = getRoom(roomName);
ws.username = username;
ws.room = room;
room.join(ws);
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
room.broadcast({
type: 'message',
username: ws.username,
content: message.content
});
} catch (e) {
ws.send(JSON.stringify({ type: 'error', message: '无效的消息格式' }));
}
});
ws.on('close', () => {
room.leave(ws);
});
});
console.log('聊天服务器运行在 ws://localhost:8080');
使用 Socket.IO
Socket.IO 是一个功能丰富的实时通信库,它在 WebSocket 基础上提供了更多功能,包括自动降级、房间管理、命名空间等。
安装
npm install socket.io
基本服务器
const http = require('http');
const { Server } = require('socket.io');
const server = http.createServer();
const io = new Server(server, {
cors: {
origin: 'http://localhost:3000',
methods: ['GET', 'POST']
}
});
io.on('connection', (socket) => {
console.log('用户连接:', socket.id);
socket.on('message', (data) => {
console.log('收到消息:', data);
socket.emit('response', '服务器收到: ' + data);
});
socket.on('disconnect', (reason) => {
console.log('用户断开:', socket.id, reason);
});
});
server.listen(8080, () => {
console.log('Socket.IO 服务器运行在端口 8080');
});
客户端使用
<script src="/socket.io/socket.io.js"></script>
<script>
const socket = io('http://localhost:8080');
socket.on('connect', () => {
console.log('已连接');
});
socket.emit('message', 'Hello');
socket.on('response', (data) => {
console.log('收到响应:', data);
});
</script>
房间管理
Socket.IO 内置房间功能:
io.on('connection', (socket) => {
socket.on('join-room', (roomName) => {
socket.join(roomName);
socket.emit('joined', roomName);
socket.to(roomName).emit('user-joined', socket.id);
});
socket.on('leave-room', (roomName) => {
socket.leave(roomName);
socket.to(roomName).emit('user-left', socket.id);
});
socket.on('room-message', (roomName, message) => {
io.to(roomName).emit('room-message', {
from: socket.id,
message: message
});
});
socket.on('disconnecting', () => {
const rooms = socket.rooms;
rooms.forEach(room => {
socket.to(room).emit('user-left', socket.id);
});
});
});
命名空间
使用命名空间分隔不同的通信通道:
const chatNamespace = io.of('/chat');
chatNamespace.on('connection', (socket) => {
console.log('聊天连接:', socket.id);
socket.on('message', (data) => {
chatNamespace.emit('message', data);
});
});
const notificationNamespace = io.of('/notifications');
notificationNamespace.on('connection', (socket) => {
console.log('通知连接:', socket.id);
socket.on('subscribe', (userId) => {
socket.join(`user-${userId}`);
});
});
function sendNotification(userId, notification) {
notificationNamespace.to(`user-${userId}`).emit('notification', notification);
}
中间件
使用中间件进行认证:
io.use((socket, next) => {
const token = socket.handshake.auth.token;
if (!token) {
return next(new Error('认证失败'));
}
try {
const user = verifyToken(token);
socket.user = user;
next();
} catch (err) {
next(new Error('无效的 token'));
}
});
io.on('connection', (socket) => {
console.log('用户已认证:', socket.user);
});
广播事件
io.on('connection', (socket) => {
socket.emit('welcome', '欢迎连接');
socket.broadcast.emit('user-connected', socket.id);
io.emit('announcement', '服务器公告');
io.to('room1').emit('room-message', '房间消息');
socket.to('room1').emit('room-message', '排除发送者');
io.except('room1').emit('broadcast', '排除特定房间');
});
Express 集成
ws 与 Express 集成
const express = require('express');
const http = require('http');
const WebSocket = require('ws');
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
app.use(express.json());
app.use(express.static('public'));
app.get('/api/status', (req, res) => {
res.json({
connections: wss.clients.size,
status: 'running'
});
});
wss.on('connection', (ws, request) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});
server.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});
Socket.IO 与 Express 集成
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: 'http://localhost:3000'
}
});
app.get('/', (req, res) => {
res.sendFile(__dirname + '/index.html');
});
app.get('/api/rooms', (req, res) => {
const rooms = io.sockets.adapter.rooms;
res.json(Array.from(rooms.keys()));
});
io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});
httpServer.listen(8080, () => {
console.log('服务器运行在 http://localhost:8080');
});
生产环境部署
使用 PM2 集群模式
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.process.pid} 退出`);
cluster.fork();
});
} else {
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', (data) => {
ws.send('Echo: ' + data);
});
});
}
使用 Redis 适配器
对于多进程/多服务器部署,使用 Redis 同步消息:
const { Server } = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const { createClient } = require('redis');
const pubClient = createClient({ url: 'redis://localhost:6379' });
const subClient = pubClient.duplicate();
const io = new Server(8080, {
adapter: createAdapter(pubClient, subClient)
});
io.on('connection', (socket) => {
socket.on('message', (data) => {
io.emit('message', data);
});
});
Nginx 配置
upstream websocket {
server 127.0.0.1:8080;
}
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 86400;
}
}
小结
本章介绍了在 Node.js 中实现 WebSocket 服务器的两种主要方式:
- ws 库:轻量级、高性能、完全遵循 WebSocket 标准
- Socket.IO:功能丰富、自动降级、内置房间和命名空间
选择建议:
- 需要标准 WebSocket 协议、追求高性能:选择 ws
- 需要自动降级、房间管理、命名空间:选择 Socket.IO
下一章将介绍 Python 服务端的 WebSocket 实现。