Python 服务端实现
Python 提供了多个优秀的 WebSocket 库,本章介绍最常用的 websockets 库和 FastAPI 框架中的 WebSocket 支持。
使用 websockets 库
websockets 是 Python 中最流行的 WebSocket 库,基于 asyncio 实现,性能优异,API 简洁。
安装
pip install websockets
基本服务器
创建一个简单的 WebSocket 服务器:
import asyncio
import websockets
async def handler(websocket):
print("客户端已连接")
try:
async for message in websocket:
print(f"收到消息: {message}")
await websocket.send(f"服务器收到: {message}")
except websockets.exceptions.ConnectionClosed:
print("客户端断开连接")
async def main():
async with websockets.serve(handler, "localhost", 8080):
print("WebSocket 服务器运行在 ws://localhost:8080")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
获取客户端信息
import asyncio
import websockets
async def handler(websocket):
remote_addr = websocket.remote_address
path = websocket.request.path
headers = websocket.request.headers
print(f"客户端地址: {remote_addr}")
print(f"请求路径: {path}")
print(f"Origin: {headers.get('Origin')}")
async for message in websocket:
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
连接参数
通过路径参数传递信息:
import asyncio
import websockets
from urllib.parse import parse_qs
async def handler(websocket):
path = websocket.request.path
query = parse_qs(websocket.request.uri.query)
room = query.get('room', ['general'])[0]
username = query.get('username', ['匿名'])[0]
print(f"用户 {username} 加入房间 {room}")
async for message in websocket:
await websocket.send(f"[{room}] {username}: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
发送不同类型消息
import asyncio
import websockets
import json
async def handler(websocket):
await websocket.send("文本消息")
await websocket.send(json.dumps({
"type": "greeting",
"message": "欢迎连接"
}))
await websocket.send(b"二进制数据")
data = bytes([1, 2, 3, 4, 5])
await websocket.send(data)
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
连接管理
实现连接管理器:
import asyncio
import websockets
from typing import Dict, Set
class ConnectionManager:
def __init__(self):
self.connections: Dict[str, websockets.WebSocketServerProtocol] = {}
self._id_counter = 0
async def add(self, websocket) -> str:
self._id_counter += 1
connection_id = str(self._id_counter)
self.connections[connection_id] = websocket
print(f"连接 {connection_id} 已添加,当前连接数: {len(self.connections)}")
return connection_id
def remove(self, connection_id: str):
if connection_id in self.connections:
del self.connections[connection_id]
print(f"连接 {connection_id} 已移除,当前连接数: {len(self.connections)}")
async def send_to(self, connection_id: str, message: str) -> bool:
ws = self.connections.get(connection_id)
if ws:
await ws.send(message)
return True
return False
async def broadcast(self, message: str, exclude: str = None):
for conn_id, ws in self.connections.items():
if conn_id != exclude:
try:
await ws.send(message)
except websockets.exceptions.ConnectionClosed:
pass
manager = ConnectionManager()
async def handler(websocket):
connection_id = await manager.add(websocket)
try:
async for message in websocket:
await manager.broadcast(f"用户 {connection_id}: {message}", exclude=connection_id)
finally:
manager.remove(connection_id)
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
心跳检测
import asyncio
import websockets
async def handler(websocket):
ping_task = asyncio.create_task(send_ping(websocket))
try:
async for message in websocket:
if message == "pong":
continue
await websocket.send(f"Echo: {message}")
finally:
ping_task.cancel()
async def send_ping(websocket):
try:
while True:
await asyncio.sleep(30)
await websocket.send("ping")
except websockets.exceptions.ConnectionClosed:
pass
async def main():
async with websockets.serve(handler, "localhost", 8080):
await asyncio.Future()
asyncio.run(main())
聊天室示例
import asyncio
import websockets
import json
from typing import Dict, Set
from datetime import datetime
class ChatRoom:
def __init__(self, name: str):
self.name = name
self.clients: Dict[str, websockets.WebSocketServerProtocol] = {}
self.history: list = []
async def join(self, client_id: str, websocket, username: str):
self.clients[client_id] = {
'websocket': websocket,
'username': username
}
for msg in self.history[-50:]:
await websocket.send(json.dumps(msg))
await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
})
def leave(self, client_id: str):
if client_id in self.clients:
username = self.clients[client_id]['username']
del self.clients[client_id]
return username
return None
async def broadcast(self, message: dict, exclude: str = None):
self.history.append(message)
if len(self.history) > 100:
self.history.pop(0)
data = json.dumps(message)
for client_id, client_info in self.clients.items():
if client_id != exclude:
try:
await client_info['websocket'].send(data)
except:
pass
class ChatServer:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_rooms: Dict[str, str] = {}
self._id_counter = 0
def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]
def generate_id(self) -> str:
self._id_counter += 1
return str(self._id_counter)
chat_server = ChatServer()
async def handler(websocket):
client_id = chat_server.generate_id()
current_room = None
username = "匿名"
try:
async for message in websocket:
try:
data = json.loads(message)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': '无效的 JSON 格式'
}))
continue
action = data.get('action')
if action == 'join':
room_name = data.get('room', 'general')
username = data.get('username', '匿名')
current_room = chat_server.get_room(room_name)
chat_server.client_rooms[client_id] = room_name
await current_room.join(client_id, websocket, username)
elif action == 'message' and current_room:
await current_room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
}, exclude=client_id)
except websockets.exceptions.ConnectionClosed:
pass
finally:
if current_room:
left_username = current_room.leave(client_id)
if left_username:
await current_room.broadcast({
'type': 'system',
'content': f'{left_username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})
async def main():
async with websockets.serve(handler, "localhost", 8080):
print("聊天服务器运行在 ws://localhost:8080")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
使用 FastAPI
FastAPI 内置 WebSocket 支持,可以方便地构建 WebSocket 服务。
基本用法
from fastapi import FastAPI, WebSocket
import uvicorn
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except Exception:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
路径参数
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room_name}] {data}")
except Exception:
pass
查询参数和依赖注入
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends
app = FastAPI()
async def get_token(token: str = Query(...)):
if token != "secret":
raise WebSocketDisconnect(code=1008)
return token
@app.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str = Depends(get_token)
):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Token: {token}, Message: {data}")
except WebSocketDisconnect:
print("客户端断开连接")
消息类型处理
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Union
import json
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
message = await websocket.receive()
if "text" in message:
text = message["text"]
print(f"文本消息: {text}")
await websocket.send_text(f"收到文本: {text}")
elif "bytes" in message:
data = message["bytes"]
print(f"二进制消息: {len(data)} 字节")
await websocket.send_bytes(data)
elif message.get("type") == "websocket.disconnect":
break
except WebSocketDisconnect:
print("客户端断开")
@app.websocket("/ws/json")
async def json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
print(f"JSON 消息: {data}")
await websocket.send_json({
"received": data,
"timestamp": "2024-01-01T00:00:00"
})
except WebSocketDisconnect:
pass
连接管理器
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
pass
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"用户 {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"用户 {client_id} 离开了聊天室")
完整聊天室示例
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import Dict, List
from datetime import datetime
import json
app = FastAPI()
class ChatRoom:
def __init__(self, name: str):
self.name = name
self.connections: Dict[int, dict] = {}
self.messages: List[dict] = []
async def join(self, client_id: int, websocket: WebSocket, username: str):
await websocket.accept()
self.connections[client_id] = {
'websocket': websocket,
'username': username
}
for msg in self.messages[-50:]:
await websocket.send_json(msg)
await self.broadcast({
'type': 'system',
'content': f'{username} 加入了聊天室',
'timestamp': datetime.now().isoformat()
}, exclude=client_id)
async def leave(self, client_id: int):
if client_id in self.connections:
username = self.connections[client_id]['username']
del self.connections[client_id]
await self.broadcast({
'type': 'system',
'content': f'{username} 离开了聊天室',
'timestamp': datetime.now().isoformat()
})
async def broadcast(self, message: dict, exclude: int = None):
self.messages.append(message)
if len(self.messages) > 100:
self.messages.pop(0)
for client_id, conn in self.connections.items():
if client_id != exclude:
try:
await conn['websocket'].send_json(message)
except:
pass
class ChatManager:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
self.client_counter = 0
def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name)
return self.rooms[name]
def get_client_id(self) -> int:
self.client_counter += 1
return self.client_counter
chat_manager = ChatManager()
@app.websocket("/ws/{room_name}")
async def websocket_endpoint(websocket: WebSocket, room_name: str):
client_id = chat_manager.get_client_id()
room = chat_manager.get_room(room_name)
username = f"用户{client_id}"
try:
await room.join(client_id, websocket, username)
while True:
data = await websocket.receive_json()
if data.get('type') == 'set_username':
username = data.get('username', username)
room.connections[client_id]['username'] = username
continue
await room.broadcast({
'type': 'message',
'username': username,
'content': data.get('content', ''),
'timestamp': datetime.now().isoformat()
})
except WebSocketDisconnect:
await room.leave(client_id)
except Exception as e:
print(f"错误: {e}")
await room.leave(client_id)
html = """
<!DOCTYPE html>
<html>
<head>
<title>聊天室</title>
<style>
body { font-family: Arial; max-width: 800px; margin: 0 auto; padding: 20px; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ccc; padding: 10px; }
.message { margin: 5px 0; }
.system { color: #666; font-style: italic; }
input { width: 70%; padding: 10px; }
button { padding: 10px 20px; }
</style>
</head>
<body>
<h1>聊天室</h1>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息...">
<button onclick="sendMessage()">发送</button>
<script>
const roomName = 'general';
const ws = new WebSocket(`ws://localhost:8080/ws/${roomName}`);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
const messagesDiv = document.getElementById('messages');
const messageDiv = document.createElement('div');
messageDiv.className = 'message ' + (data.type === 'system' ? 'system' : '');
messageDiv.textContent = data.type === 'system'
? data.content
: `${data.username}: ${data.content}`;
messagesDiv.appendChild(messageDiv);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
};
function sendMessage() {
const input = document.getElementById('messageInput');
if (input.value) {
ws.send(JSON.stringify({ content: input.value }));
input.value = '';
}
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') sendMessage();
});
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
使用 Django Channels
Django Channels 扩展了 Django 的异步能力,支持 WebSocket。
安装
pip install channels
配置
# settings.py
INSTALLED_APPS = [
'channels',
...
]
ASGI_APPLICATION = 'myproject.asgi.application'
创建 Consumer
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope['url_route']['kwargs']['room_name']
self.room_group_name = f'chat_{self.room_name}'
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name
)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
await self.channel_layer.group_send(
self.room_group_name,
{
'type': 'chat_message',
'message': message
}
)
async def chat_message(self, event):
message = event['message']
await self.send(text_data=json.dumps({
'message': message
}))
路由配置
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]
小结
本章介绍了 Python 中实现 WebSocket 服务器的三种主要方式:
- websockets 库:轻量级、基于 asyncio、适合纯 WebSocket 服务
- FastAPI:现代 Web 框架、内置 WebSocket 支持、适合 REST API + WebSocket 混合场景
- Django Channels:Django 扩展、支持 WebSocket、适合 Django 项目
选择建议:
- 纯 WebSocket 服务:选择 websockets
- REST API + WebSocket:选择 FastAPI
- Django 项目:选择 Django Channels