23.2 房间管理与状态同步
FastAPI综合项目实战:构建实时协作应用 - 房间管理与状态同步完整教程
本教程从零开始,详细讲解如何使用FastAPI和WebSocket构建一个实时协作应用,重点实现房间管理和状态同步功能。包含代码示例、分步指导和实用建议,适合新手学习FastAPI高级特性。
综合项目实战:使用FastAPI构建实时协作应用之房间管理与状态同步
介绍
实时协作应用允许用户在共享环境中同步互动,如在线文档编辑或聊天室。FastAPI是一个现代、快速(高性能)的Web框架,基于Python 3.7+,结合WebSocket支持,非常适合构建这类应用。本教程将带你从头构建一个简单的实时协作应用,专注于房间管理和状态同步功能。
项目概览
我们将创建一个Web应用,用户可以创建或加入房间,并在房间内通过WebSocket实时同步状态(例如,文本消息或光标位置)。项目分为两部分:
- 房间管理:通过REST API处理房间的创建、加入和离开。
- 状态同步:通过WebSocket实现房间内用户之间的实时数据广播。
环境设置
在开始前,确保你已安装Python 3.7+。然后,创建虚拟环境并安装所需库:
pip install fastapi uvicorn websockets
fastapi:用于构建API。uvicorn:ASGI服务器,用于运行FastAPI应用。websockets:用于处理WebSocket连接。
基础项目结构
创建一个新目录real-time-collab-app,并建立以下文件结构:
real-time-collab-app/
├── main.py # 主应用文件
├── models.py # 数据模型定义
├── routes/ # API路由
│ ├── rooms.py # 房间管理路由
│ └── websocket.py # WebSocket路由
└── README.md # 项目说明
实现房间管理
步骤1:定义数据模型
在models.py中,定义房间和用户模型。我们将使用简单的内存存储来简化示例。
# models.py
from pydantic import BaseModel
from typing import Dict, List
class User(BaseModel):
user_id: str
username: str
class Room(BaseModel):
room_id: str
name: str
users: List[User] = [] # 存储房间内的用户
state: Dict = {} # 房间的共享状态,例如文本内容
# 内存存储示例
rooms_db: Dict[str, Room] = {}
步骤2:创建房间管理API
在routes/rooms.py中,实现REST API端点。
# routes/rooms.py
from fastapi import APIRouter, HTTPException
from models import Room, User, rooms_db
import uuid
router = APIRouter(prefix="/rooms", tags=["rooms"])
@router.post("/create", response_model=Room)
async def create_room(name: str):
"""创建新房间"""
room_id = str(uuid.uuid4())
room = Room(room_id=room_id, name=name)
rooms_db[room_id] = room
return room
@router.post("/{room_id}/join", response_model=Room)
async def join_room(room_id: str, user: User):
"""用户加入房间"""
if room_id not in rooms_db:
raise HTTPException(status_code=404, detail="Room not found")
room = rooms_db[room_id]
if any(u.user_id == user.user_id for u in room.users):
raise HTTPException(status_code=400, detail="User already in room")
room.users.append(user)
return room
@router.post("/{room_id}/leave")
async def leave_room(room_id: str, user_id: str):
"""用户离开房间"""
if room_id not in rooms_db:
raise HTTPException(status_code=404, detail="Room not found")
room = rooms_db[room_id]
room.users = [u for u in room.users if u.user_id != user_id]
return {"message": "User left room"}
@router.get("/{room_id}", response_model=Room)
async def get_room(room_id: str):
"""获取房间信息"""
if room_id not in rooms_db:
raise HTTPException(status_code=404, detail="Room not found")
return rooms_db[room_id]
步骤3:集成路由到主应用
在main.py中,注册API路由。
# main.py
from fastapi import FastAPI
from routes import rooms, websocket
app = FastAPI(title="实时协作应用", description="一个房间管理和状态同步的示例应用")
# 注册路由
app.include_router(rooms.router)
app.include_router(websocket.router) # 稍后添加WebSocket路由
@app.get("/")
async def root():
return {"message": "欢迎使用实时协作应用!访问 /docs 查看API文档。"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
现在,你可以运行应用测试房间管理API:
uvicorn main:app --reload
访问 http://localhost:8000/docs 查看和测试API。
实现状态同步
步骤4:设置WebSocket路由
在routes/websocket.py中,实现WebSocket连接处理。
# routes/websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import Dict, List
import json
router = APIRouter()
class ConnectionManager:
"""管理WebSocket连接"""
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {} # room_id -> 连接列表
async def connect(self, websocket: WebSocket, room_id: str):
await websocket.accept()
if room_id not in self.active_connections:
self.active_connections[room_id] = []
self.active_connections[room_id].append(websocket)
print(f"用户连接到房间 {room_id}")
def disconnect(self, websocket: WebSocket, room_id: str):
if room_id in self.active_connections:
self.active_connections[room_id].remove(websocket)
if not self.active_connections[room_id]:
del self.active_connections[room_id]
print(f"用户断开连接从房间 {room_id}")
async def broadcast(self, message: str, room_id: str):
"""向房间内所有用户广播消息"""
if room_id in self.active_connections:
for connection in self.active_connections[room_id]:
try:
await connection.send_text(message)
except:
pass
manager = ConnectionManager()
@router.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
"""WebSocket端点,处理实时通信"""
await manager.connect(websocket, room_id)
try:
while True:
data = await websocket.receive_text()
# 假设客户端发送JSON数据,例如 {"type": "update", "state": {...}}
message = json.loads(data)
# 广播消息给同一房间的所有用户
await manager.broadcast(json.dumps(message), room_id)
except WebSocketDisconnect:
manager.disconnect(websocket, room_id)
# 可选:通知其他用户该用户离开
await manager.broadcast(json.dumps({"type": "user_left", "user_id": "some_id"}), room_id)
更新main.py以包含WebSocket路由:确保已导入并注册。
步骤5:前端示例(可选)
为了测试,你可以创建一个简单的HTML文件来连接WebSocket。这里是一个基本示例,保存为client.html:
<!-- client.html -->
<!DOCTYPE html>
<html>
<head>
<title>实时协作测试</title>
</head>
<body>
<h2>房间状态同步</h2>
<input type="text" id="roomId" placeholder="输入房间ID">
<button onclick="connect()">连接</button>
<div id="messages"></div>
<input type="text" id="messageInput" placeholder="输入消息">
<button onclick="sendMessage()">发送</button>
<script>
let ws;
function connect() {
const roomId = document.getElementById('roomId').value;
ws = new WebSocket(`ws://localhost:8000/ws/${roomId}`);
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
document.getElementById('messages').innerHTML += `<p>${JSON.stringify(data)}</p>`;
};
ws.onopen = function() {
alert("连接到房间 " + roomId);
};
}
function sendMessage() {
const message = document.getElementById('messageInput').value;
if (ws) {
ws.send(JSON.stringify({type: "message", content: message}));
}
}
</script>
</body>
</html>
运行应用并在浏览器中打开client.html来测试WebSocket功能。
总结与扩展
- 已实现功能:通过REST API管理房间,通过WebSocket实现实时状态同步。
- 优化建议:
- 使用数据库(如SQLite或PostgreSQL)持久化房间和用户数据。
- 添加认证和授权,确保只有授权用户才能加入房间。
- 实现更复杂的状态同步逻辑,如冲突解决(使用OT或CRDT算法)。
- 部署到云平台(如Heroku或AWS)并配置SSL支持WebSocket安全连接。
- 学习要点:通过这个项目,你学会了FastAPI的基础API设计、WebSocket集成以及实时应用的核心概念。
现在,你可以扩展此应用,添加更多功能,如文件共享或实时白板。继续探索FastAPI文档以了解更多高级特性,如依赖注入和测试。
祝你学习愉快!如有问题,请参考FastAPI官方文档或社区资源。