跳到主要内容

Python 服务端实现

Python 提供了多个优秀的 WebSocket 库,本章介绍最常用的 websockets 库和 FastAPI 框架中的 WebSocket 支持。

使用 websockets 库

websockets 是 Python 中最流行的 WebSocket 库,基于 asyncio 实现,性能优异,API 简洁。

安装

pip install websockets

基本服务器

创建一个简单的 WebSocket 服务器:

import asyncio
import websockets

async def handler(websocket):
print("客户端已连接")

try:
async for message in websocket:
print(f"收到消息: {message}")
await websocket.send(f"服务器收到: {message}")
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")

async def main():
async with websockets.serve(handler, "localhost", 8080):
print("WebSocket 服务器运行在 ws://localhost:8080")
await asyncio.Future()

if __name__ == "__main__":
asyncio.run(main())

获取客户端信息

import asyncio
import websockets

async def handler(websocket):
remote_addr = websocket.remote_address
path = websocket.request.path
headers = websocket.request.headers

print(f"客户端地址: {remote_addr}")
print(f"请求路径: {path}")
print(f"Origin: {headers.get('Origin')}")

async for message in websocket:
await websocket.send(f"Echo: {message}")

async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()

asyncio.run(main())

连接参数

通过路径参数传递信息:

import asyncio
import websockets
from urllib.parse import parse_qs

async def handler(websocket):
path = websocket.request.path
query = parse_qs(websocket.request.uri.query)

room = query.get('room', ['general'])[0]
username = query.get('username', ['匿名'])[0]

print(f"用户 {username} 加入房间 {room}")

async for message in websocket:
await websocket.send(f"[{room}] {username}: {message}")

async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()

asyncio.run(main())

发送不同类型消息

import asyncio
import websockets
import json

async def handler(websocket):
await websocket.send("文本消息")

await websocket.send(json.dumps({
"type": "greeting",
"message": "欢迎连接"
}))

await websocket.send(b"二进制数据")

data = bytes([1, 2, 3, 4, 5])
await websocket.send(data)

async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()

asyncio.run(main())

连接管理

实现连接管理器:

import asyncio
import websockets
from typing import Dict, Set

class ConnectionManager:
def __init__(self):
self.connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self._id_counter = 0

async def add(self, websocket) -> str:
self._id_counter += 1
connection_id = str(self._id_counter)
self.connections[connection_id] = websocket
print(f"连接 {connection_id} 已添加,当前连接数: {len(self.connections)}")
return connection_id

def remove(self, connection_id: str):
if connection_id in self.connections:
del self.connections[connection_id]
print(f"连接 {connection_id} 已移除,当前连接数: {len(self.connections)}")

async def send_to(self, connection_id: str, message: str) -> bool:
ws = self.connections.get(connection_id)
if ws:
await ws.send(message)
return True
return False

async def broadcast(self, message: str, exclude: str = None):
for conn_id, ws in self.connections.items():
if conn_id != exclude:
try:
await ws.send(message)
except websockets.exceptions.ConnectionClosed:
pass

manager = ConnectionManager()

async def handler(websocket):
connection_id = await manager.add(websocket)

try:
async for message in websocket:
await manager.broadcast(f"用户 {connection_id}: {message}", exclude=connection_id)
finally:
manager.remove(connection_id)

async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()

asyncio.run(main())

心跳检测

import asyncio
import websockets

async def handler(websocket):
ping_task = asyncio.create_task(send_ping(websocket))

try:
async for message in websocket:
if message == "pong":
continue
await websocket.send(f"Echo: {message}")
finally:
ping_task.cancel()

async def send_ping(websocket):
try:
while True:
await asyncio.sleep(30)
await websocket.send("ping")
except websockets.exceptions.ConnectionClosed:
pass

async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()

asyncio.run(main())

聊天室示例

import asyncio
import websockets
import json
from typing import Dict, Set
from datetime import datetime

class ChatRoom:
def __init__(self, name: str):
self.name = name
self.clients: Dict[str, websockets.WebSocketServerProtocol] = {}
self.history: list = []

async def join(self, client_id: str, websocket, username: str):
self.clients[client_id] = {
'websocket': websocket,
'username': username
}

for msg in self.history[-50:]:
await websocket.send(json.dumps(msg))

await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
})

def leave(self, client_id: str):
if client_id in self.clients:
username = self.clients[client_id]['username']
del self.clients[client_id]
return username
return None

async def broadcast(self, message: dict, exclude: str = None):
self.history.append(message)
if len(self.history) > 100:
self.history.pop(0)

data = json.dumps(message)
for client_id, client_info in self.clients.items():
if client_id != exclude:
try:
await client_info['websocket'].send(data)
except:
pass

class ChatServer:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_rooms: Dict[str, str] = {}
self._id_counter = 0

def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]

def generate_id(self) -> str:
self._id_counter += 1
return str(self._id_counter)

chat_server = ChatServer()

async def handler(websocket):
client_id = chat_server.generate_id()
current_room = None
username = "匿名"

try:
async for message in websocket:
try:
data = json.loads(message)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': '无效的 JSON 格式'
}))
continue

action = data.get('action')

if action == 'join':
room_name = data.get('room', 'general')
username = data.get('username', '匿名')
current_room = chat_server.get_room(room_name)
chat_server.client_rooms[client_id] = room_name

await current_room.join(client_id, websocket, username)

elif action == 'message' and current_room:
await current_room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
}, exclude=client_id)

except websockets.exceptions.ConnectionClosed:
pass
finally:
if current_room:
left_username = current_room.leave(client_id)
if left_username:
await current_room.broadcast({
'type': 'system',
'content': f'{left_username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})

async def main():
async with websockets.serve(handler, "localhost", 8080):
print("聊天服务器运行在 ws://localhost:8080")
await asyncio.Future()

if __name__ == "__main__":
asyncio.run(main())

使用 FastAPI

FastAPI 内置 WebSocket 支持,可以方便地构建 WebSocket 服务。

基本用法

from fastapi import FastAPI, WebSocket
import uvicorn

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()

try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except Exception:
pass

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)

路径参数

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
await websocket.accept()

try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room_name}] {data}")
except Exception:
pass

查询参数和依赖注入

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends

app = FastAPI()

async def get_token(token: str = Query(...)):
if token != "secret":
raise WebSocketDisconnect(code=1008)
return token

@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Depends(get_token)
):
await websocket.accept()

try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Token: {token}, Message: {data}")
except WebSocketDisconnect:
print("客户端断开连接")

消息类型处理

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Union
import json

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()

try:
while True:
message = await websocket.receive()

if "text" in message:
text = message["text"]
print(f"文本消息: {text}")
await websocket.send_text(f"收到文本: {text}")

elif "bytes" in message:
data = message["bytes"]
print(f"二进制消息: {len(data)} 字节")
await websocket.send_bytes(data)

elif message.get("type") == "websocket.disconnect":
break

except WebSocketDisconnect:
print("客户端断开")

@app.websocket("/ws/json")
async def json_endpoint(websocket: WebSocket):
await websocket.accept()

try:
while True:
data = await websocket.receive_json()
print(f"JSON 消息: {data}")
await websocket.send_json({
"received": data,
"timestamp": "2024-01-01T00:00:00"
})
except WebSocketDisconnect:
pass

连接管理器

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json

app = FastAPI()

class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)

async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)

try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"用户 {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"用户 {client_id} 离开了聊天室")

完整聊天室示例

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List
from datetime import datetime
import json

app = FastAPI()

class ChatRoom:
def __init__(self, name: str):
self.name = name
self.connections: Dict[int, dict] = {}
self.messages: List[dict] = []

async def join(self, client_id: int, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[client_id] = {
'websocket': websocket,
'username': username
}

for msg in self.messages[-50:]:
await websocket.send_json(msg)

await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
}, exclude=client_id)

async def leave(self, client_id: int):
if client_id in self.connections:
username = self.connections[client_id]['username']
del self.connections[client_id]
await self.broadcast({
'type': 'system',
'content': f'{username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})

async def broadcast(self, message: dict, exclude: int = None):
self.messages.append(message)
if len(self.messages) > 100:
self.messages.pop(0)

for client_id, conn in self.connections.items():
if client_id != exclude:
try:
await conn['websocket'].send_json(message)
except:
pass

class ChatManager:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_counter = 0

def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]

def get_client_id(self) -> int:
self.client_counter += 1
return self.client_counter

chat_manager = ChatManager()

@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
client_id = chat_manager.get_client_id()
room = chat_manager.get_room(room_name)
username = f"用户{client_id}"

try:
await room.join(client_id, websocket, username)

while True:
data = await websocket.receive_json()

if data.get('type') == 'set_username':
username = data.get('username', username)
room.connections[client_id]['username'] = username
continue

await room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
})

except WebSocketDisconnect:
await room.leave(client_id)
except Exception as e:
print(f"错误: {e}")
await room.leave(client_id)

html = """
<!DOCTYPE html>
<html>
<head>
<title>聊天室</title>
<style>
body { font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.system { color: #666; font-style: italic; }
input { width: 70%; padding: 10px; }
button { padding: 10px 20px; }
</style>
</head>
<body>
<h1>聊天室</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>

<script>
const roomName = 'general';
const ws = new WebSocket(`ws://localhost:8080/ws/${roomName}`);

ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message ' + (data.type === 'system' ? 'system' : '');
messageDiv.textContent = data.type === 'system'
? data.content
: `${data.username}: ${data.content}`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};

function sendMessage() {
const input = document.getElementById('messageInput');
if (input.value) {
ws.send(JSON.stringify({ content: input.value }));
input.value = '';
}
}

document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') sendMessage();
});
</script>
</body>
</html>
"""

@app.get("/")
async def get():
return HTMLResponse(html)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)

使用 Django Channels

Django Channels 扩展了 Django 的异步能力,支持 WebSocket。

安装

pip install channels

配置

# settings.py
INSTALLED_APPS = [
'channels',
...
]

ASGI_APPLICATION = 'myproject.asgi.application'

创建 Consumer

import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'

await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)

await self.accept()

async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)

async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']

await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)

async def chat_message(self, event):
message = event['message']

await self.send(text_data=json.dumps({
'message': message
}))

路由配置

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

小结

本章介绍了 Python 中实现 WebSocket 服务器的三种主要方式:

  1. websockets 库:轻量级、基于 asyncio、适合纯 WebSocket 服务
  2. FastAPI:现代 Web 框架、内置 WebSocket 支持、适合 REST API + WebSocket 混合场景
  3. Django Channels:Django 扩展、支持 WebSocket、适合 Django 项目

选择建议:

  • 纯 WebSocket 服务:选择 websockets
  • REST API + WebSocket:选择 FastAPI
  • Django 项目:选择 Django Channels