FastAPI 教程

10.4 WebSocket 连接管理

FastAPI WebSocket 连接管理:从入门到实战

FastAPI 教程

本教程详细讲解如何在FastAPI中使用WebSocket进行连接管理,涵盖建立、维护和断开连接的完整流程,适合初学者快速上手实现实时应用。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI WebSocket 连接管理教程

介绍

WebSocket 是一种全双工通信协议,允许客户端和服务器之间建立持久连接,实现实时数据交换。在 FastAPI 中,WebSocket 支持使得构建聊天应用、实时通知等场景变得简单。本教程将重点介绍 WebSocket 连接管理,帮助你从零开始掌握如何在 FastAPI 中高效管理 WebSocket 连接。

前置条件

  • 安装 Python 3.7 或更高版本。
  • 安装 FastAPI 和 Uvicorn(一个 ASGI 服务器)。
  • 通过 pip 安装:pip install fastapi uvicorn

设置 FastAPI WebSocket

FastAPI 基于 ASGI 标准,原生支持 WebSocket。首先,创建一个简单的 FastAPI 应用并引入 WebSocket 相关模块。

定义 WebSocket 端点

在 FastAPI 中,使用 @app.websocket 装饰器定义 WebSocket 端点。端点路径类似于 HTTP 路由,用于客户端连接。

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

# 一个简单的 HTML 页面用于测试 WebSocket
html = """
<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket Test</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <input type="text" id="messageText" autocomplete="off"/>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = new WebSocket("ws://localhost:8000/ws");
            ws.onmessage = function(event) {
                var messages = document.getElementById('messages');
                var message = document.createElement('li');
                var content = document.createTextNode(event.data);
                message.appendChild(content);
                messages.appendChild(message);
            };
            function sendMessage(event) {
                var input = document.getElementById("messageText");
                ws.send(input.value);
                input.value = '';
                event.preventDefault();
            }
        </script>
    </body>
</html>
"""

@app.get("/")
async def get():
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # 这里将实现连接管理
    pass

连接管理详解

WebSocket 连接管理涉及连接的整个生命周期:建立、消息处理、断开。以下是分步详解。

5.1 建立连接

当客户端尝试连接 WebSocket 端点时,服务器需要接受连接。在 FastAPI 中,使用 await websocket.accept() 来建立连接。

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()  # 接受客户端连接
    # 后续处理

接受连接后,连接已建立,服务器可以开始监听消息。

5.2 接收消息

使用循环来接收来自客户端的消息。await websocket.receive_text() 用于接收文本消息,类似地,还有 receive_bytes() 等。

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        try:
            data = await websocket.receive_text()  # 接收文本消息
            # 处理接收到的消息
            print(f"收到消息: {data}")
        except Exception as e:
            # 处理异常,例如客户端断开连接
            print(f"连接错误: {e}")
            break

5.3 发送消息

使用 await websocket.send_text(message) 向客户端发送消息。可以在接收消息后立即回复,或基于事件发送。

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        try:
            data = await websocket.receive_text()
            print(f"收到消息: {data}")
            # 发送回应
            response = f"服务器回复: {data}"
            await websocket.send_text(response)
        except Exception as e:
            print(f"连接错误: {e}")
            break

5.4 断开连接

连接断开可以通过异常处理或主动关闭。在循环中捕获异常(如 websockets.exceptions.ConnectionClosed)来检测断开。或者使用 await websocket.close() 主动关闭连接。

from websockets.exceptions import ConnectionClosed  # 需要安装 websockets 库: pip install websockets

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print(f"收到消息: {data}")
            await websocket.send_text(f"回复: {data}")
    except ConnectionClosed:
        print("客户端断开连接")
    finally:
        # 可选的清理工作
        print("连接关闭,执行清理")

完整示例:简单聊天室

结合以上部分,创建一个简单的聊天室示例,支持多个客户端连接并广播消息。

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse

app = FastAPI()

# 存储活跃的 WebSocket 连接
connections = []

@app.get("/")
async def get():
    html = """
    <!DOCTYPE html>
    <html>
        <head><title>WebSocket Chat</title></head>
        <body>
            <h1>WebSocket Chat Room</h1>
            <form onsubmit="sendMessage(event)">
                <input type="text" id="messageText" autocomplete="off"/>
                <button>Send</button>
            </form>
            <ul id='messages'></ul>
            <script>
                var ws = new WebSocket("ws://localhost:8000/ws");
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages');
                    var message = document.createElement('li');
                    message.textContent = event.data;
                    messages.appendChild(message);
                };
                function sendMessage(event) {
                    var input = document.getElementById("messageText");
                    ws.send(input.value);
                    input.value = '';
                    event.preventDefault();
                }
            </script>
        </body>
    </html>
    """
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connections.append(websocket)  # 添加到连接列表
    try:
        while True:
            data = await websocket.receive_text()
            # 广播消息到所有连接的客户端
            for connection in connections:
                try:
                    await connection.send_text(f"用户说: {data}")
                except:
                    # 如果某个连接失败,从列表中移除
                    connections.remove(connection)
    except WebSocketDisconnect:
        connections.remove(websocket)  # 断开时移除
        print("一个客户端断开连接")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

进阶主题

  • 连接池管理: 对于大量连接,考虑使用连接池来优化性能,例如使用 asyncio.Queue 或第三方库。
  • 身份验证: 在连接时验证用户,例如通过查询参数或头信息传递令牌。
  • 错误处理: 添加更多异常处理,处理网络延迟、超时等情况。
  • 广播和群组: 实现群组聊天,将连接分组管理。

总结

本教程详细介绍了在 FastAPI 中管理 WebSocket 连接的全过程,从基础设置到完整示例。通过实践,你可以轻松构建实时应用。记住关键步骤:接受连接、接收和发送消息、处理断开。扩展应用时,考虑连接管理的最佳实践,如内存优化和错误恢复。

继续探索 FastAPI 文档以了解更多高级功能,如依赖注入和后台任务。祝你学习愉快!

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包