FastAPI 教程

23.2 房间管理与状态同步

FastAPI综合项目实战:构建实时协作应用 - 房间管理与状态同步完整教程

FastAPI 教程

本教程从零开始,详细讲解如何使用FastAPI和WebSocket构建一个实时协作应用,重点实现房间管理和状态同步功能。包含代码示例、分步指导和实用建议,适合新手学习FastAPI高级特性。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

综合项目实战:使用FastAPI构建实时协作应用之房间管理与状态同步

介绍

实时协作应用允许用户在共享环境中同步互动,如在线文档编辑或聊天室。FastAPI是一个现代、快速(高性能)的Web框架,基于Python 3.7+,结合WebSocket支持,非常适合构建这类应用。本教程将带你从头构建一个简单的实时协作应用,专注于房间管理和状态同步功能。

项目概览

我们将创建一个Web应用,用户可以创建或加入房间,并在房间内通过WebSocket实时同步状态(例如,文本消息或光标位置)。项目分为两部分:

  • 房间管理:通过REST API处理房间的创建、加入和离开。
  • 状态同步:通过WebSocket实现房间内用户之间的实时数据广播。

环境设置

在开始前,确保你已安装Python 3.7+。然后,创建虚拟环境并安装所需库:

pip install fastapi uvicorn websockets
  • fastapi:用于构建API。
  • uvicorn:ASGI服务器,用于运行FastAPI应用。
  • websockets:用于处理WebSocket连接。

基础项目结构

创建一个新目录real-time-collab-app,并建立以下文件结构:

real-time-collab-app/
├── main.py          # 主应用文件
├── models.py        # 数据模型定义
├── routes/          # API路由
│   ├── rooms.py     # 房间管理路由
│   └── websocket.py # WebSocket路由
└── README.md        # 项目说明

实现房间管理

步骤1:定义数据模型

models.py中,定义房间和用户模型。我们将使用简单的内存存储来简化示例。

# models.py
from pydantic import BaseModel
from typing import Dict, List

class User(BaseModel):
    user_id: str
    username: str

class Room(BaseModel):
    room_id: str
    name: str
    users: List[User] = []  # 存储房间内的用户
    state: Dict = {}        # 房间的共享状态,例如文本内容

# 内存存储示例
rooms_db: Dict[str, Room] = {}

步骤2:创建房间管理API

routes/rooms.py中,实现REST API端点。

# routes/rooms.py
from fastapi import APIRouter, HTTPException
from models import Room, User, rooms_db
import uuid

router = APIRouter(prefix="/rooms", tags=["rooms"])

@router.post("/create", response_model=Room)
async def create_room(name: str):
    """创建新房间"""
    room_id = str(uuid.uuid4())
    room = Room(room_id=room_id, name=name)
    rooms_db[room_id] = room
    return room

@router.post("/{room_id}/join", response_model=Room)
async def join_room(room_id: str, user: User):
    """用户加入房间"""
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found")
    room = rooms_db[room_id]
    if any(u.user_id == user.user_id for u in room.users):
        raise HTTPException(status_code=400, detail="User already in room")
    room.users.append(user)
    return room

@router.post("/{room_id}/leave")
async def leave_room(room_id: str, user_id: str):
    """用户离开房间"""
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found")
    room = rooms_db[room_id]
    room.users = [u for u in room.users if u.user_id != user_id]
    return {"message": "User left room"}

@router.get("/{room_id}", response_model=Room)
async def get_room(room_id: str):
    """获取房间信息"""
    if room_id not in rooms_db:
        raise HTTPException(status_code=404, detail="Room not found")
    return rooms_db[room_id]

步骤3:集成路由到主应用

main.py中,注册API路由。

# main.py
from fastapi import FastAPI
from routes import rooms, websocket

app = FastAPI(title="实时协作应用", description="一个房间管理和状态同步的示例应用")

# 注册路由
app.include_router(rooms.router)
app.include_router(websocket.router)  # 稍后添加WebSocket路由

@app.get("/")
async def root():
    return {"message": "欢迎使用实时协作应用!访问 /docs 查看API文档。"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

现在,你可以运行应用测试房间管理API:

uvicorn main:app --reload

访问 http://localhost:8000/docs 查看和测试API。

实现状态同步

步骤4:设置WebSocket路由

routes/websocket.py中,实现WebSocket连接处理。

# routes/websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from typing import Dict, List
import json

router = APIRouter()

class ConnectionManager:
    """管理WebSocket连接"""
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}  # room_id -> 连接列表

    async def connect(self, websocket: WebSocket, room_id: str):
        await websocket.accept()
        if room_id not in self.active_connections:
            self.active_connections[room_id] = []
        self.active_connections[room_id].append(websocket)
        print(f"用户连接到房间 {room_id}")

    def disconnect(self, websocket: WebSocket, room_id: str):
        if room_id in self.active_connections:
            self.active_connections[room_id].remove(websocket)
            if not self.active_connections[room_id]:
                del self.active_connections[room_id]
        print(f"用户断开连接从房间 {room_id}")

    async def broadcast(self, message: str, room_id: str):
        """向房间内所有用户广播消息"""
        if room_id in self.active_connections:
            for connection in self.active_connections[room_id]:
                try:
                    await connection.send_text(message)
                except:
                    pass

manager = ConnectionManager()

@router.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    """WebSocket端点,处理实时通信"""
    await manager.connect(websocket, room_id)
    try:
        while True:
            data = await websocket.receive_text()
            # 假设客户端发送JSON数据,例如 {"type": "update", "state": {...}}
            message = json.loads(data)
            # 广播消息给同一房间的所有用户
            await manager.broadcast(json.dumps(message), room_id)
    except WebSocketDisconnect:
        manager.disconnect(websocket, room_id)
        # 可选:通知其他用户该用户离开
        await manager.broadcast(json.dumps({"type": "user_left", "user_id": "some_id"}), room_id)

更新main.py以包含WebSocket路由:确保已导入并注册。

步骤5:前端示例(可选)

为了测试,你可以创建一个简单的HTML文件来连接WebSocket。这里是一个基本示例,保存为client.html

<!-- client.html -->
<!DOCTYPE html>
<html>
<head>
    <title>实时协作测试</title>
</head>
<body>
    <h2>房间状态同步</h2>
    <input type="text" id="roomId" placeholder="输入房间ID">
    <button onclick="connect()">连接</button>
    <div id="messages"></div>
    <input type="text" id="messageInput" placeholder="输入消息">
    <button onclick="sendMessage()">发送</button>
    <script>
        let ws;
        function connect() {
            const roomId = document.getElementById('roomId').value;
            ws = new WebSocket(`ws://localhost:8000/ws/${roomId}`);
            ws.onmessage = function(event) {
                const data = JSON.parse(event.data);
                document.getElementById('messages').innerHTML += `<p>${JSON.stringify(data)}</p>`;
            };
            ws.onopen = function() {
                alert("连接到房间 " + roomId);
            };
        }
        function sendMessage() {
            const message = document.getElementById('messageInput').value;
            if (ws) {
                ws.send(JSON.stringify({type: "message", content: message}));
            }
        }
    </script>
</body>
</html>

运行应用并在浏览器中打开client.html来测试WebSocket功能。

总结与扩展

  • 已实现功能:通过REST API管理房间,通过WebSocket实现实时状态同步。
  • 优化建议
    • 使用数据库(如SQLite或PostgreSQL)持久化房间和用户数据。
    • 添加认证和授权,确保只有授权用户才能加入房间。
    • 实现更复杂的状态同步逻辑,如冲突解决(使用OT或CRDT算法)。
    • 部署到云平台(如Heroku或AWS)并配置SSL支持WebSocket安全连接。
  • 学习要点:通过这个项目,你学会了FastAPI的基础API设计、WebSocket集成以及实时应用的核心概念。

现在,你可以扩展此应用,添加更多功能,如文件共享或实时白板。继续探索FastAPI文档以了解更多高级特性,如依赖注入和测试。

祝你学习愉快!如有问题,请参考FastAPI官方文档或社区资源。

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包