Python 服务端实现
Python 提供了多个优秀的 WebSocket 库,本章介绍最常用的 websockets 库和 FastAPI 框架中的 WebSocket 支持。
使用 websockets 库
websockets 是 Python 中最流行、最成熟的 WebSocket 库,它专注于正确性、简洁性、健壮性和性能。该库支持多种网络 I/O 和控制流范式。
安装
pip install websockets
版本架构说明
websockets 库提供了三种实现方式,适用于不同的使用场景:
| 实现方式 | 模块路径 | 适用场景 |
|---|---|---|
| asyncio(推荐) | websockets.asyncio | 服务端处理大量并发连接,客户端需要异步操作 |
| threading | websockets.sync | 不熟悉 asyncio 的开发者,服务端连接数较少的场景 |
| Sans-I/O | websockets.sansio | 集成到第三方库或应用服务器中 |
版本演进:websockets 13.0 引入了新的 asyncio 实现(websockets.asyncio),它基于 Sans-I/O 实现,提供了更多功能。旧版实现(websockets.legacy)在 14.0 版本中被弃用,将于 2030 年移除。websockets 14.0 将新的 asyncio 实现设为默认,要求 Python ≥ 3.9。websockets 15.0(2025年2月)新增了路由功能(route()和unix_route()),threading 实现也开始支持心跳检测。新项目应使用新版 API,旧项目应尽快迁移。
asyncio 实现(推荐)
asyncio 实现基于 Python 内置的异步 I/O 库,提供了优雅的协程 API,非常适合处理大量并发连接的服务端。
基本服务器:
import asyncio
from websockets.asyncio.server import serve
async def echo_handler(websocket):
"""处理客户端连接的协程函数"""
print(f"客户端已连接: {websocket.remote_address}")
try:
# 异步迭代接收消息
async for message in websocket:
print(f"收到消息: {message}")
# 回显消息
await websocket.send(f"服务器收到: {message}")
except Exception as e:
print(f"连接异常: {e}")
finally:
print("客户端断开连接")
async def main():
# 创建服务器,serve() 返回异步上下文管理器
async with serve(echo_handler, "localhost", 8080) as server:
print("WebSocket 服务器运行在 ws://localhost:8080")
# serve_forever() 保持服务器运行
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
服务器配置选项:
from websockets.asyncio.server import serve
async def main():
async with serve(
handler,
host="0.0.0.0", # 监听地址
port=8080, # 监听端口
ping_interval=20, # Ping 间隔(秒),默认 20
ping_timeout=20, # Ping 超时(秒),默认 20
close_timeout=10, # 关闭超时(秒),默认 10
max_size=2**20, # 最大消息大小(字节),默认 1MB
compression="deflate", # 压缩方式,可选 "deflate" 或 None
origins=["http://localhost:3000"], # 允许的 Origin 列表
) as server:
await server.serve_forever()
路由功能(websockets 15.0+)
websockets 15.0 引入了路由功能,可以根据请求路径将连接分发到不同的处理器,这在构建包含多个 WebSocket 端点的应用时非常有用。
import asyncio
from websockets.asyncio.server import serve, route
async def chat_handler(websocket):
"""处理聊天连接"""
print(f"聊天连接: {websocket.request.path}")
async for message in websocket:
await websocket.send(f"[聊天] {message}")
async def notification_handler(websocket):
"""处理通知连接"""
print(f"通知连接: {websocket.request.path}")
async for message in websocket:
await websocket.send(f"[通知] {message}")
async def default_handler(websocket):
"""默认处理器"""
await websocket.close(1003, "未知的端点")
# 路由配置
routes = [
route("/chat", chat_handler),
route("/notifications", notification_handler),
]
async def main():
async with serve(
route(routes, default_handler), # 使用路由
"localhost",
8080
) as server:
print("WebSocket 服务器运行在 ws://localhost:8080")
print("可用端点: /chat, /notifications")
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
路由功能的工作原理是:当客户端连接时,服务器会根据请求路径查找匹配的路由,如果找到则调用对应的处理器,否则调用默认处理器。路径匹配是精确匹配,不支持通配符或正则表达式。
Unix Socket 路由:
对于通过 Unix socket 进行本地通信的场景,可以使用 unix_route():
import asyncio
from websockets.asyncio.server import unix_serve, unix_route
async def main():
routes = [
unix_route("/chat", chat_handler),
unix_route("/notifications", notification_handler),
]
async with unix_serve(
unix_route(routes, default_handler),
"/tmp/websocket.sock"
) as server:
await server.serve_forever()
threading 实现
如果你不熟悉 asyncio,或者服务端只需要处理少量连接,threading 实现是一个简单的替代方案。
from websockets.sync.server import serve
def echo_handler(websocket):
"""处理客户端连接的普通函数(非协程)"""
print(f"客户端已连接: {websocket.remote_address}")
try:
# 同步迭代接收消息
for message in websocket:
print(f"收到消息: {message}")
websocket.send(f"服务器收到: {message}")
except Exception as e:
print(f"连接异常: {e}")
finally:
print("客户端断开连接")
def main():
# 创建同步服务器
with serve(echo_handler, "localhost", 8080) as server:
print("WebSocket 服务器运行在 ws://localhost:8080")
server.serve_forever()
if __name__ == "__main__":
main()
threading 实现的 API 与 asyncio 实现几乎相同,主要区别:
- 使用
for而不是async for遍历消息 - 使用
websocket.send()而不是await websocket.send() - 不需要
asyncio.run()或async def
asyncio 客户端
websockets 也提供了客户端实现:
import asyncio
from websockets.asyncio.client import connect
async def hello():
# 连接到 WebSocket 服务器
async with connect("ws://localhost:8080") as websocket:
# 发送消息
await websocket.send("Hello, Server!")
# 接收消息
message = await websocket.recv()
print(f"收到响应: {message}")
asyncio.run(hello())
threading 客户端
from websockets.sync.client import connect
def hello():
with connect("ws://localhost:8080") as websocket:
websocket.send("Hello, Server!")
message = websocket.recv()
print(f"收到响应: {message}")
hello()
获取客户端信息
import asyncio
import websockets
async def handler(websocket):
remote_addr = websocket.remote_address
path = websocket.request.path
headers = websocket.request.headers
print(f"客户端地址: {remote_addr}")
print(f"请求路径: {path}")
print(f"Origin: {headers.get('Origin')}")
async for message in websocket:
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
连接参数
通过路径参数传递信息:
import asyncio
import websockets
from urllib.parse import parse_qs
async def handler(websocket):
path = websocket.request.path
query = parse_qs(websocket.request.uri.query)
room = query.get('room', ['general'])[0]
username = query.get('username', ['匿名'])[0]
print(f"用户 {username} 加入房间 {room}")
async for message in websocket:
await websocket.send(f"[{room}] {username}: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
发送不同类型消息
import asyncio
import websockets
import json
async def handler(websocket):
await websocket.send("文本消息")
await websocket.send(json.dumps({
"type": "greeting",
"message": "欢迎连接"
}))
await websocket.send(b"二进制数据")
data = bytes([1, 2, 3, 4, 5])
await websocket.send(data)
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
连接管理
实现连接管理器:
import asyncio
import websockets
from typing import Dict, Set
class ConnectionManager:
def __init__(self):
self.connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self._id_counter = 0
async def add(self, websocket) -> str:
self._id_counter += 1
connection_id = str(self._id_counter)
self.connections[connection_id] = websocket
print(f"连接 {connection_id} 已添加,当前连接数: {len(self.connections)}")
return connection_id
def remove(self, connection_id: str):
if connection_id in self.connections:
del self.connections[connection_id]
print(f"连接 {connection_id} 已移除,当前连接数: {len(self.connections)}")
async def send_to(self, connection_id: str, message: str) -> bool:
ws = self.connections.get(connection_id)
if ws:
await ws.send(message)
return True
return False
async def broadcast(self, message: str, exclude: str = None):
for conn_id, ws in self.connections.items():
if conn_id != exclude:
try:
await ws.send(message)
except websockets.exceptions.ConnectionClosed:
pass
manager = ConnectionManager()
async def handler(websocket):
connection_id = await manager.add(websocket)
try:
async for message in websocket:
await manager.broadcast(f"用户 {connection_id}: {message}", exclude=connection_id)
finally:
manager.remove(connection_id)
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
心跳检测
import asyncio
import websockets
async def handler(websocket):
ping_task = asyncio.create_task(send_ping(websocket))
try:
async for message in websocket:
if message == "pong":
continue
await websocket.send(f"Echo: {message}")
finally:
ping_task.cancel()
async def send_ping(websocket):
try:
while True:
await asyncio.sleep(30)
await websocket.send("ping")
except websockets.exceptions.ConnectionClosed:
pass
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
聊天室示例
import asyncio
import websockets
import json
from typing import Dict, Set
from datetime import datetime
class ChatRoom:
def __init__(self, name: str):
self.name = name
self.clients: Dict[str, websockets.WebSocketServerProtocol] = {}
self.history: list = []
async def join(self, client_id: str, websocket, username: str):
self.clients[client_id] = {
'websocket': websocket,
'username': username
}
for msg in self.history[-50:]:
await websocket.send(json.dumps(msg))
await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
})
def leave(self, client_id: str):
if client_id in self.clients:
username = self.clients[client_id]['username']
del self.clients[client_id]
return username
return None
async def broadcast(self, message: dict, exclude: str = None):
self.history.append(message)
if len(self.history) > 100:
self.history.pop(0)
data = json.dumps(message)
for client_id, client_info in self.clients.items():
if client_id != exclude:
try:
await client_info['websocket'].send(data)
except:
pass
class ChatServer:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_rooms: Dict[str, str] = {}
self._id_counter = 0
def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]
def generate_id(self) -> str:
self._id_counter += 1
return str(self._id_counter)
chat_server = ChatServer()
async def handler(websocket):
client_id = chat_server.generate_id()
current_room = None
username = "匿名"
try:
async for message in websocket:
try:
data = json.loads(message)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': '无效的 JSON 格式'
}))
continue
action = data.get('action')
if action == 'join':
room_name = data.get('room', 'general')
username = data.get('username', '匿名')
current_room = chat_server.get_room(room_name)
chat_server.client_rooms[client_id] = room_name
await current_room.join(client_id, websocket, username)
elif action == 'message' and current_room:
await current_room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
}, exclude=client_id)
except websockets.exceptions.ConnectionClosed:
pass
finally:
if current_room:
left_username = current_room.leave(client_id)
if left_username:
await current_room.broadcast({
'type': 'system',
'content': f'{left_username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})
async def main():
async with websockets.serve(handler, "localhost", 8080):
print("聊天服务器运行在 ws://localhost:8080")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
使用 FastAPI
FastAPI 内置 WebSocket 支持,可以方便地构建 WebSocket 服务。
基本用法
from fastapi import FastAPI, WebSocket
import uvicorn
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except Exception:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
路径参数
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room_name}] {data}")
except Exception:
pass
查询参数和依赖注入
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends
app = FastAPI()
async def get_token(token: str = Query(...)):
if token != "secret":
raise WebSocketDisconnect(code=1008)
return token
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Depends(get_token)
):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Token: {token}, Message: {data}")
except WebSocketDisconnect:
print("客户端断开连接")
消息类型处理
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Union
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
message = await websocket.receive()
if "text" in message:
text = message["text"]
print(f"文本消息: {text}")
await websocket.send_text(f"收到文本: {text}")
elif "bytes" in message:
data = message["bytes"]
print(f"二进制消息: {len(data)} 字节")
await websocket.send_bytes(data)
elif message.get("type") == "websocket.disconnect":
break
except WebSocketDisconnect:
print("客户端断开")
@app.websocket("/ws/json")
async def json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
print(f"JSON 消息: {data}")
await websocket.send_json({
"received": data,
"timestamp": "2024-01-01T00:00:00"
})
except WebSocketDisconnect:
pass
连接管理器
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"用户 {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"用户 {client_id} 离开了聊天室")
完整聊天室示例
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List
from datetime import datetime
import json
app = FastAPI()
class ChatRoom:
def __init__(self, name: str):
self.name = name
self.connections: Dict[int, dict] = {}
self.messages: List[dict] = []
async def join(self, client_id: int, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[client_id] = {
'websocket': websocket,
'username': username
}
for msg in self.messages[-50:]:
await websocket.send_json(msg)
await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
}, exclude=client_id)
async def leave(self, client_id: int):
if client_id in self.connections:
username = self.connections[client_id]['username']
del self.connections[client_id]
await self.broadcast({
'type': 'system',
'content': f'{username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})
async def broadcast(self, message: dict, exclude: int = None):
self.messages.append(message)
if len(self.messages) > 100:
self.messages.pop(0)
for client_id, conn in self.connections.items():
if client_id != exclude:
try:
await conn['websocket'].send_json(message)
except:
pass
class ChatManager:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_counter = 0
def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]
def get_client_id(self) -> int:
self.client_counter += 1
return self.client_counter
chat_manager = ChatManager()
@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
client_id = chat_manager.get_client_id()
room = chat_manager.get_room(room_name)
username = f"用户{client_id}"
try:
await room.join(client_id, websocket, username)
while True:
data = await websocket.receive_json()
if data.get('type') == 'set_username':
username = data.get('username', username)
room.connections[client_id]['username'] = username
continue
await room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
})
except WebSocketDisconnect:
await room.leave(client_id)
except Exception as e:
print(f"错误: {e}")
await room.leave(client_id)
html = """
<!DOCTYPE html>
<html>
<head>
<title>聊天室</title>
<style>
body { font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.system { color: #666; font-style: italic; }
input { width: 70%; padding: 10px; }
button { padding: 10px 20px; }
</style>
</head>
<body>
<h1>聊天室</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<script>
const roomName = 'general';
const ws = new WebSocket(`ws://localhost:8080/ws/${roomName}`);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message ' + (data.type === 'system' ? 'system' : '');
messageDiv.textContent = data.type === 'system'
? data.content
: `${data.username}: ${data.content}`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
function sendMessage() {
const input = document.getElementById('messageInput');
if (input.value) {
ws.send(JSON.stringify({ content: input.value }));
input.value = '';
}
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') sendMessage();
});
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
使用 Django Channels
Django Channels 扩展了 Django 的异步能力,支持 WebSocket。
安装
pip install channels
配置
# settings.py
INSTALLED_APPS = [
'channels',
...
]
ASGI_APPLICATION = 'myproject.asgi.application'
创建 Consumer
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
async def chat_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
路由配置
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
小结
本章介绍了 Python 中实现 WebSocket 服务器的三种主要方式:
- websockets 库:轻量级、基于 asyncio、适合纯 WebSocket 服务
- FastAPI:现代 Web 框架、内置 WebSocket 支持、适合 REST API + WebSocket 混合场景
- Django Channels:Django 扩展、支持 WebSocket、适合 Django 项目
选择建议:
- 纯 WebSocket 服务:选择 websockets
- REST API + WebSocket:选择 FastAPI
- Django 项目:选择 Django Channels