10.4 WebSocket 连接管理
FastAPI WebSocket 连接管理:从入门到实战
本教程详细讲解如何在FastAPI中使用WebSocket进行连接管理,涵盖建立、维护和断开连接的完整流程,适合初学者快速上手实现实时应用。
FastAPI WebSocket 连接管理教程
介绍
WebSocket 是一种全双工通信协议,允许客户端和服务器之间建立持久连接,实现实时数据交换。在 FastAPI 中,WebSocket 支持使得构建聊天应用、实时通知等场景变得简单。本教程将重点介绍 WebSocket 连接管理,帮助你从零开始掌握如何在 FastAPI 中高效管理 WebSocket 连接。
前置条件
- 安装 Python 3.7 或更高版本。
- 安装 FastAPI 和 Uvicorn(一个 ASGI 服务器)。
- 通过 pip 安装:
pip install fastapi uvicorn
设置 FastAPI WebSocket
FastAPI 基于 ASGI 标准,原生支持 WebSocket。首先,创建一个简单的 FastAPI 应用并引入 WebSocket 相关模块。
定义 WebSocket 端点
在 FastAPI 中,使用 @app.websocket 装饰器定义 WebSocket 端点。端点路径类似于 HTTP 路由,用于客户端连接。
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
# 一个简单的 HTML 页面用于测试 WebSocket
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
function sendMessage(event) {
var input = document.getElementById("messageText");
ws.send(input.value);
input.value = '';
event.preventDefault();
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# 这里将实现连接管理
pass
连接管理详解
WebSocket 连接管理涉及连接的整个生命周期:建立、消息处理、断开。以下是分步详解。
5.1 建立连接
当客户端尝试连接 WebSocket 端点时,服务器需要接受连接。在 FastAPI 中,使用 await websocket.accept() 来建立连接。
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受客户端连接
# 后续处理
接受连接后,连接已建立,服务器可以开始监听消息。
5.2 接收消息
使用循环来接收来自客户端的消息。await websocket.receive_text() 用于接收文本消息,类似地,还有 receive_bytes() 等。
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
try:
data = await websocket.receive_text() # 接收文本消息
# 处理接收到的消息
print(f"收到消息: {data}")
except Exception as e:
# 处理异常,例如客户端断开连接
print(f"连接错误: {e}")
break
5.3 发送消息
使用 await websocket.send_text(message) 向客户端发送消息。可以在接收消息后立即回复,或基于事件发送。
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
try:
data = await websocket.receive_text()
print(f"收到消息: {data}")
# 发送回应
response = f"服务器回复: {data}"
await websocket.send_text(response)
except Exception as e:
print(f"连接错误: {e}")
break
5.4 断开连接
连接断开可以通过异常处理或主动关闭。在循环中捕获异常(如 websockets.exceptions.ConnectionClosed)来检测断开。或者使用 await websocket.close() 主动关闭连接。
from websockets.exceptions import ConnectionClosed # 需要安装 websockets 库: pip install websockets
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print(f"收到消息: {data}")
await websocket.send_text(f"回复: {data}")
except ConnectionClosed:
print("客户端断开连接")
finally:
# 可选的清理工作
print("连接关闭,执行清理")
完整示例:简单聊天室
结合以上部分,创建一个简单的聊天室示例,支持多个客户端连接并广播消息。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
# 存储活跃的 WebSocket 连接
connections = []
@app.get("/")
async def get():
html = """
<!DOCTYPE html>
<html>
<head><title>WebSocket Chat</title></head>
<body>
<h1>WebSocket Chat Room</h1>
<form onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'></ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
message.textContent = event.data;
messages.appendChild(message);
};
function sendMessage(event) {
var input = document.getElementById("messageText");
ws.send(input.value);
input.value = '';
event.preventDefault();
}
</script>
</body>
</html>
"""
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connections.append(websocket) # 添加到连接列表
try:
while True:
data = await websocket.receive_text()
# 广播消息到所有连接的客户端
for connection in connections:
try:
await connection.send_text(f"用户说: {data}")
except:
# 如果某个连接失败,从列表中移除
connections.remove(connection)
except WebSocketDisconnect:
connections.remove(websocket) # 断开时移除
print("一个客户端断开连接")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
进阶主题
- 连接池管理: 对于大量连接,考虑使用连接池来优化性能,例如使用
asyncio.Queue或第三方库。 - 身份验证: 在连接时验证用户,例如通过查询参数或头信息传递令牌。
- 错误处理: 添加更多异常处理,处理网络延迟、超时等情况。
- 广播和群组: 实现群组聊天,将连接分组管理。
总结
本教程详细介绍了在 FastAPI 中管理 WebSocket 连接的全过程,从基础设置到完整示例。通过实践,你可以轻松构建实时应用。记住关键步骤:接受连接、接收和发送消息、处理断开。扩展应用时,考虑连接管理的最佳实践,如内存优化和错误恢复。
继续探索 FastAPI 文档以了解更多高级功能,如依赖注入和后台任务。祝你学习愉快!