FastAPI 教程

10.2 WebSocket 端点定义

FastAPI WebSocket端点定义教程:详细指南与代码示例

FastAPI 教程

本教程详细讲解如何在FastAPI中定义WebSocket端点,包括基础概念、代码实现、进阶用法,并附有完整示例,帮助开发者快速上手WebSocket实时通信。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI WebSocket端点定义教程

WebSocket是一种在网络客户端和服务器之间建立双向通信的协议,常用于实时应用,如聊天应用、游戏、实时数据推送等。与传统的HTTP请求不同,WebSocket允许在单个连接上持续发送和接收数据,避免了轮询的开销。FastAPI作为一个现代的Web框架,内置了WebSocket支持,使定义端点变得简单高效。

什么是WebSocket?

WebSocket协议定义在RFC 6455中,它提供了一个持久的连接,客户端和服务器可以随时发送消息,无需重新建立连接。这与HTTP的请求-响应模式不同,HTTP每次请求都需要新建连接,而WebSocket保持连接开放,直到任何一方关闭它。

主要特点:

  • 双向通信:服务器和客户端都可以主动发送消息。
  • 低延迟:由于连接持久,减少了握手开销。
  • 全双工:消息可以同时发送和接收。

为什么在FastAPI中使用WebSocket?

FastAPI基于Starlette,后者支持异步WebSocket处理,使得在Python中构建高性能的WebSocket应用变得容易。使用FastAPI定义WebSocket端点,你可以:

  • 集成到现有FastAPI项目中,统一路由和依赖管理。
  • 利用依赖注入系统,处理身份验证、数据库连接等。
  • 保持代码简洁和可维护性。

如何定义WebSocket端点?

在FastAPI中,使用@app.websocket()装饰器来定义WebSocket端点。这个装饰器接受一个路径字符串作为参数,类似于HTTP路由。WebSocket端点的处理函数是一个异步函数,接收一个WebSocket对象作为参数,通常命名为websocket,并通过await websocket.accept()接受连接。

基本步骤:

  1. 导入FastAPI和WebSocket类:从fastapi模块导入。
  2. 创建FastAPI实例
  3. 使用装饰器定义端点
  4. 实现处理函数:接受连接、接收消息、发送响应、处理关闭。

代码示例:简单聊天端点

以下是一个完整的FastAPI应用,定义了一个WebSocket端点,实现基本的双向消息传递。

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    # 接受WebSocket连接
    await websocket.accept()
    try:
        while True:
            # 接收来自客户端的消息,假设为文本类型
            data = await websocket.receive_text()
            # 发送响应回客户端
            await websocket.send_text(f"Message received: {data}")
    except Exception as e:
        # 发生异常时关闭连接,例如客户端断开
        await websocket.close()
        print(f"Connection closed: {e}")

代码解释:

  • @app.websocket("/ws"):定义一个WebSocket端点,路径为/ws。客户端可以通过ws://localhost:8000/ws连接。
  • 异步函数:函数必须用async def定义,因为WebSocket操作是异步的。
  • websocket: WebSocket参数:FastAPI会自动注入WebSocket对象,用于管理连接。
  • await websocket.accept():接受连接,这是WebSocket握手后的第一步。
  • 循环接收消息:使用while True循环持续接收消息。websocket.receive_text()等待文本消息;还有其他方法如receive_bytes()receive_json()处理不同类型。
  • 发送响应:使用websocket.send_text()发送消息回客户端。
  • 异常处理:在try-except块中,处理断开连接等异常,确保资源释放。

进阶主题

1. 处理不同类型的消息

WebSocket可以发送和接收文本、二进制或JSON数据。FastAPI提供相应方法:

  • receive_text():接收字符串。
  • receive_bytes():接收字节数据。
  • receive_json():接收并解析JSON数据。
  • 类似地,有send_text()send_bytes()send_json()发送消息。

示例:接收JSON消息

@app.websocket("/ws/json")
async def websocket_json_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_json()  # 假设客户端发送JSON
        response = {"status": "ok", "data": data}
        await websocket.send_json(response)

2. 依赖注入

FastAPI支持在WebSocket端点中使用依赖项,例如验证用户身份。依赖项函数可以是异步的,并且返回的值会传递给端点函数。

示例:添加一个简单的依赖来验证查询参数

from fastapi import WebSocket, Depends

def get_token(websocket: WebSocket):
    # 从查询参数获取token,例如 ws://localhost:8000/ws?token=abc
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close()  # 如果没有token,关闭连接
        raise WebSocketDisconnect()
    return token

@app.websocket("/ws/auth")
async def websocket_auth_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
    await websocket.accept()
    await websocket.send_text(f"Authenticated with token: {token}")
    # 继续处理其他消息

3. 处理连接生命周期

除了消息循环,还可以处理连接事件,如打开和关闭。FastAPI允许你在端点函数内自定义逻辑。例如,在连接关闭时记录日志。

示例:记录连接打开和关闭

@app.websocket("/ws/log")
async def websocket_log_endpoint(websocket: WebSocket):
    print("WebSocket connection opened")
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except Exception as e:
        print(f"WebSocket connection closed: {e}")
        await websocket.close()

4. 多个客户端连接

在实际应用中,可能需要管理多个WebSocket连接。FastAPI本身不提供内置的广播机制,但你可以结合asyncio或第三方库(如websockets)实现。基本思路是维护一个连接列表,在循环中广播消息。

简单示例:广播消息到所有连接

connections = []

@app.websocket("/ws/broadcast")
async def websocket_broadcast_endpoint(websocket: WebSocket):
    await websocket.accept()
    connections.append(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            for conn in connections:
                await conn.send_text(f"Broadcast: {data}")
    except Exception as e:
        connections.remove(websocket)
        await websocket.close()

注意:这个示例是基本的,实际应用中需要考虑并发安全和性能优化。

总结

在FastAPI中定义WebSocket端点非常简单,只需使用@app.websocket()装饰器和异步处理函数。通过接收和发送消息,你可以构建实时应用。结合依赖注入和进阶功能,FastAPI提供了强大的WebSocket支持,使开发过程高效且易于维护。

关键要点:

  • WebSocket提供双向、持久的连接。
  • 使用@app.websocket(path)定义端点。
  • 处理函数接受WebSocket对象,用await websocket.accept()接受连接。
  • 在循环中处理消息,使用receive_*send_*方法。
  • 利用FastAPI的依赖系统处理认证等逻辑。
  • 注意异常处理和资源管理,确保连接正确关闭。

通过本教程,你应该能够开始在FastAPI中实现WebSocket功能。继续实践和探索,以构建更复杂的实时应用!

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包