FastAPI 教程

23.1 WebSocket 实时通信架构

FastAPI WebSocket 实时协作应用综合实战教程

FastAPI 教程

本教程详细讲解如何使用FastAPI构建一个基于WebSocket的实时协作应用,涵盖从WebSocket基础到完整项目实战,适合FastAPI初学者和开发者轻松上手实时通信开发。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI WebSocket 实时协作应用综合实战教程

引言

在当今数字化时代,实时协作应用如聊天工具、协同编辑等越来越受欢迎。这些应用依赖于WebSocket协议来实现服务器与客户端之间的双向实时通信。FastAPI作为一个现代、快速的Python Web框架,内置了强大的WebSocket支持,使得构建实时应用变得简单高效。本教程将带您从零开始,使用FastAPI构建一个简单的实时聊天应用,深入理解WebSocket实时通信架构。

准备工作

在开始之前,请确保您已安装Python和FastAPI。我们还需要一个异步服务器,推荐使用Uvicorn。运行以下命令安装依赖:

pip install fastapi uvicorn websockets
  • FastAPI:用于构建Web应用。
  • Uvicorn:作为ASGI服务器运行FastAPI应用。
  • websockets:用于处理WebSocket连接(FastAPI内置WebSocket支持,但安装websockets可提供额外功能)。

FastAPI WebSocket 基础

WebSocket是一种在单个TCP连接上提供全双工通信的协议,允许服务器主动向客户端推送数据。在FastAPI中,您可以轻松定义WebSocket端点来接收和发送实时数据。

设置WebSocket端点

在FastAPI中,您可以使用@app.websocket装饰器来定义WebSocket端点。基本结构如下:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()  # 接受WebSocket连接
    while True:
        data = await websocket.receive_text()  # 接收客户端发送的数据
        # 处理数据,例如广播给其他客户端
        await websocket.send_text(f"Message text was: {data}")  # 发送响应
  • WebSocket.accept():服务器接受客户端的WebSocket连接请求。
  • WebSocket.receive_text():异步接收来自客户端的文本数据。
  • WebSocket.send_text():异步向客户端发送文本数据。

连接管理和广播

对于实时协作应用,通常需要管理多个连接并向所有客户端广播消息。我们可以使用一个全局列表来跟踪连接。

项目实战:构建实时聊天应用

我们将构建一个简单的实时聊天室,支持多个用户同时连接并发送消息,所有连接用户都能实时接收消息。

步骤1:创建FastAPI应用和WebSocket端点

创建一个新文件main.py,并编写以下代码:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

# 存储活跃连接的列表
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # 广播消息给所有连接的用户,包含发送者ID
            await manager.broadcast(f"Client #{client_id} says: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")
  • ConnectionManager类:管理WebSocket连接,包括连接、断开连接、发送个人消息和广播消息。
  • WebSocket端点:使用路径参数client_id来标识每个客户端。
    • 连接时,调用manager.connect()将新连接添加到列表。
    • 接收消息时,调用manager.broadcast()向所有连接广播消息。
    • 处理断开连接异常WebSocketDisconnect,当客户端断开时从列表中移除并通知其他用户。

步骤2:创建前端界面

为了测试WebSocket功能,我们需要一个简单的前端HTML页面。在static目录下创建index.html(或直接在代码中返回)。这里,为了简化,我们将使用FastAPI的StaticFiles来提供静态文件。

首先,确保创建static文件夹,并在其中添加index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>FastAPI WebSocket Chat</title>
    <script>
        let ws;
        function connect() {
            const clientId = document.getElementById('clientId').value || 'anonymous';
            ws = new WebSocket(`ws://localhost:8000/ws/${clientId}`);
            ws.onopen = () => {
                console.log('Connected to WebSocket');
                document.getElementById('status').textContent = 'Connected';
            };
            ws.onmessage = (event) => {
                const messages = document.getElementById('messages');
                const message = document.createElement('li');
                message.textContent = event.data;
                messages.appendChild(message);
            };
            ws.onclose = () => {
                document.getElementById('status').textContent = 'Disconnected';
            };
        }
        function sendMessage() {
            const input = document.getElementById('messageInput');
            ws.send(input.value);
            input.value = '';
        }
    </script>
</head>
<body>
    <h1>FastAPI WebSocket Chat</h1>
    <div>
        <label for="clientId">Your ID:</label>
        <input type="text" id="clientId" placeholder="Enter your ID">
        <button onclick="connect()">Connect</button>
    </div>
    <p>Status: <span id="status">Disconnected</span></p>
    <div>
        <input type="text" id="messageInput" placeholder="Type a message">
        <button onclick="sendMessage()">Send</button>
    </div>
    <ul id="messages"></ul>
</body>
</html>

main.py中添加静态文件服务:

from fastapi.staticfiles import StaticFiles

app.mount("/", StaticFiles(directory="static", html=True), name="static")

步骤3:运行应用

确保main.pystatic目录在同一文件夹中,然后运行Uvicorn服务器:

uvicorn main:app --reload --host 0.0.0.0 --port 8000

访问http://localhost:8000在浏览器中打开聊天界面。打开多个标签页或不同浏览器,输入不同的ID连接,发送消息,所有连接的用户都会实时接收消息。

高级主题

消息格式

在实际应用中,您可能希望使用JSON格式发送结构化数据。修改代码以接收和发送JSON:

import json

# 在WebSocket端点中
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_json()  # 接收JSON数据
            message = {"client_id": client_id, "message": data["message"]}
            await manager.broadcast(json.dumps(message))  # 广播JSON字符串
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(json.dumps({"type": "leave", "client_id": client_id}))

处理大量连接

对于高并发场景,考虑使用异步队列或消息代理如Redis来管理连接和消息广播。

测试和部署

测试

使用WebSocket客户端工具测试,如websocat或浏览器开发者工具。对于自动化测试,FastAPI支持测试WebSocket端点。

部署

部署到生产环境时,使用支持WebSocket的服务器如Uvicorn配合Nginx反向代理。确保Nginx配置支持WebSocket升级连接。

总结

通过本教程,您学会了如何使用FastAPI构建一个基于WebSocket的实时协作应用。关键点包括:

  • 设置WebSocket端点并使用ConnectionManager管理多个连接。
  • 实现消息广播功能,支持实时通信。
  • 创建一个简单的前端界面进行测试。

WebSocket是构建实时应用的核心技术,FastAPI使其实现变得简洁高效。您可以将此项目扩展到更复杂的场景,如协同编辑、实时通知等。

下一步

  • 探索FastAPI的更多高级功能,如依赖注入、后台任务。
  • 集成数据库存储聊天历史。
  • 添加身份验证和授权,确保安全通信。

希望本教程能帮助您快速上手FastAPI WebSocket开发!如有疑问,请查阅FastAPI官方文档或社区资源。

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包