10.2 WebSocket 端点定义
FastAPI WebSocket端点定义教程:详细指南与代码示例
本教程详细讲解如何在FastAPI中定义WebSocket端点,包括基础概念、代码实现、进阶用法,并附有完整示例,帮助开发者快速上手WebSocket实时通信。
FastAPI WebSocket端点定义教程
WebSocket是一种在网络客户端和服务器之间建立双向通信的协议,常用于实时应用,如聊天应用、游戏、实时数据推送等。与传统的HTTP请求不同,WebSocket允许在单个连接上持续发送和接收数据,避免了轮询的开销。FastAPI作为一个现代的Web框架,内置了WebSocket支持,使定义端点变得简单高效。
什么是WebSocket?
WebSocket协议定义在RFC 6455中,它提供了一个持久的连接,客户端和服务器可以随时发送消息,无需重新建立连接。这与HTTP的请求-响应模式不同,HTTP每次请求都需要新建连接,而WebSocket保持连接开放,直到任何一方关闭它。
主要特点:
- 双向通信:服务器和客户端都可以主动发送消息。
- 低延迟:由于连接持久,减少了握手开销。
- 全双工:消息可以同时发送和接收。
为什么在FastAPI中使用WebSocket?
FastAPI基于Starlette,后者支持异步WebSocket处理,使得在Python中构建高性能的WebSocket应用变得容易。使用FastAPI定义WebSocket端点,你可以:
- 集成到现有FastAPI项目中,统一路由和依赖管理。
- 利用依赖注入系统,处理身份验证、数据库连接等。
- 保持代码简洁和可维护性。
如何定义WebSocket端点?
在FastAPI中,使用@app.websocket()装饰器来定义WebSocket端点。这个装饰器接受一个路径字符串作为参数,类似于HTTP路由。WebSocket端点的处理函数是一个异步函数,接收一个WebSocket对象作为参数,通常命名为websocket,并通过await websocket.accept()接受连接。
基本步骤:
- 导入FastAPI和WebSocket类:从fastapi模块导入。
- 创建FastAPI实例。
- 使用装饰器定义端点。
- 实现处理函数:接受连接、接收消息、发送响应、处理关闭。
代码示例:简单聊天端点
以下是一个完整的FastAPI应用,定义了一个WebSocket端点,实现基本的双向消息传递。
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 接受WebSocket连接
await websocket.accept()
try:
while True:
# 接收来自客户端的消息,假设为文本类型
data = await websocket.receive_text()
# 发送响应回客户端
await websocket.send_text(f"Message received: {data}")
except Exception as e:
# 发生异常时关闭连接,例如客户端断开
await websocket.close()
print(f"Connection closed: {e}")
代码解释:
@app.websocket("/ws"):定义一个WebSocket端点,路径为/ws。客户端可以通过ws://localhost:8000/ws连接。- 异步函数:函数必须用
async def定义,因为WebSocket操作是异步的。 websocket: WebSocket参数:FastAPI会自动注入WebSocket对象,用于管理连接。await websocket.accept():接受连接,这是WebSocket握手后的第一步。- 循环接收消息:使用
while True循环持续接收消息。websocket.receive_text()等待文本消息;还有其他方法如receive_bytes()、receive_json()处理不同类型。 - 发送响应:使用
websocket.send_text()发送消息回客户端。 - 异常处理:在
try-except块中,处理断开连接等异常,确保资源释放。
进阶主题
1. 处理不同类型的消息
WebSocket可以发送和接收文本、二进制或JSON数据。FastAPI提供相应方法:
receive_text():接收字符串。receive_bytes():接收字节数据。receive_json():接收并解析JSON数据。- 类似地,有
send_text()、send_bytes()、send_json()发送消息。
示例:接收JSON消息
@app.websocket("/ws/json")
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_json() # 假设客户端发送JSON
response = {"status": "ok", "data": data}
await websocket.send_json(response)
2. 依赖注入
FastAPI支持在WebSocket端点中使用依赖项,例如验证用户身份。依赖项函数可以是异步的,并且返回的值会传递给端点函数。
示例:添加一个简单的依赖来验证查询参数
from fastapi import WebSocket, Depends
def get_token(websocket: WebSocket):
# 从查询参数获取token,例如 ws://localhost:8000/ws?token=abc
token = websocket.query_params.get("token")
if not token:
await websocket.close() # 如果没有token,关闭连接
raise WebSocketDisconnect()
return token
@app.websocket("/ws/auth")
async def websocket_auth_endpoint(websocket: WebSocket, token: str = Depends(get_token)):
await websocket.accept()
await websocket.send_text(f"Authenticated with token: {token}")
# 继续处理其他消息
3. 处理连接生命周期
除了消息循环,还可以处理连接事件,如打开和关闭。FastAPI允许你在端点函数内自定义逻辑。例如,在连接关闭时记录日志。
示例:记录连接打开和关闭
@app.websocket("/ws/log")
async def websocket_log_endpoint(websocket: WebSocket):
print("WebSocket connection opened")
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except Exception as e:
print(f"WebSocket connection closed: {e}")
await websocket.close()
4. 多个客户端连接
在实际应用中,可能需要管理多个WebSocket连接。FastAPI本身不提供内置的广播机制,但你可以结合asyncio或第三方库(如websockets)实现。基本思路是维护一个连接列表,在循环中广播消息。
简单示例:广播消息到所有连接
connections = []
@app.websocket("/ws/broadcast")
async def websocket_broadcast_endpoint(websocket: WebSocket):
await websocket.accept()
connections.append(websocket)
try:
while True:
data = await websocket.receive_text()
for conn in connections:
await conn.send_text(f"Broadcast: {data}")
except Exception as e:
connections.remove(websocket)
await websocket.close()
注意:这个示例是基本的,实际应用中需要考虑并发安全和性能优化。
总结
在FastAPI中定义WebSocket端点非常简单,只需使用@app.websocket()装饰器和异步处理函数。通过接收和发送消息,你可以构建实时应用。结合依赖注入和进阶功能,FastAPI提供了强大的WebSocket支持,使开发过程高效且易于维护。
关键要点:
- WebSocket提供双向、持久的连接。
- 使用
@app.websocket(path)定义端点。 - 处理函数接受
WebSocket对象,用await websocket.accept()接受连接。 - 在循环中处理消息,使用
receive_*和send_*方法。 - 利用FastAPI的依赖系统处理认证等逻辑。
- 注意异常处理和资源管理,确保连接正确关闭。
通过本教程,你应该能够开始在FastAPI中实现WebSocket功能。继续实践和探索,以构建更复杂的实时应用!