23.4 在线状态与通知
FastAPI实战教程:实时协作应用在线状态与通知实现指南
本教程详细讲解如何使用FastAPI构建实时协作应用,实现用户在线状态检测和通知功能。适合初学者学习,包含代码示例和实践步骤,帮助快速掌握FastAPI在实时应用中的应用。
推荐工具
综合项目实战 - 实时协作应用在线状态与通知
欢迎来到FastAPI实战教程!本章节将带您构建一个完整的实时协作应用,重点实现用户在线状态管理和通知系统。本教程面向新手,力求简单易懂,通过步骤化讲解和代码示例帮助您上手FastAPI的高级应用。
项目概述
实时协作应用(如在线编辑器或聊天工具)需要实时更新用户状态(如在线/离线)和发送通知(如消息提醒)。我们将使用FastAPI作为后端框架,结合WebSockets实现实时通信。
为什么需要在线状态和通知?
- 在线状态:让用户知道谁在线,提升协作体验。
- 通知:及时推送事件,如新消息或文档更新。
技术栈
- FastAPI:用于构建高效API。
- WebSockets:实现双向实时通信。
- 数据库(示例使用SQLAlchemy和SQLite):存储用户和通知数据。
- Python 3.7+:编程语言环境。
实现步骤
步骤1:项目设置
首先,创建一个新的FastAPI项目,安装依赖包。
mkdir realtime-app && cd realtime-app
pip install fastapi uvicorn websockets sqlalchemy
初始化FastAPI应用,创建一个main.py文件。
步骤2:用户认证基础
实现简单的用户模型和认证机制(为简化,使用基本认证)。
# models.py
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, nullable=False)
online = Column(Boolean, default=False) # 在线状态字段
步骤3:WebSockets实现在线状态
使用FastAPI的WebSocketEndpoint或websocket装饰器处理连接。
# websocket_handler.py
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
# 更新用户状态为在线
user = get_user_from_websocket(websocket) # 假设函数获取用户
user.online = True
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
# 更新用户状态为离线
user.online = False
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
# 在main.py中集成
from fastapi import FastAPI, WebSocket
app = FastAPI()
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# 处理心跳或消息
await manager.send_personal_message(f"Echo: {data}", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
步骤4:通知系统
构建通知模型和API端点,通过WebSockets推送通知。
# models.py 添加通知模型
class Notification(Base):
__tablename__ = 'notifications'
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, nullable=False)
message = Column(String, nullable=False)
read = Column(Boolean, default=False)
# 在WebSocket中发送通知
async def send_notification(user_id: int, message: str):
# 保存通知到数据库
notification = Notification(user_id=user_id, message=message)
# 通过WebSocket推送实时通知
for connection in manager.active_connections:
if connection.user_id == user_id: # 假设连接有用户ID
await connection.send_text(json.dumps({"type": "notification", "data": message}))
步骤5:整合和测试
创建API端点用于管理用户和通知,并运行应用。
# main.py 添加路由
@app.get("/users/status")
async def get_user_status():
# 返回在线用户列表
online_users = [user for user in users if user.online]
return {"online_users": online_users}
@app.post("/notifications/send")
async def send_notification_api(user_id: int, message: str):
await send_notification(user_id, message)
return {"message": "Notification sent"}
# 运行应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
代码解释和最佳实践
- 简单易懂:代码结构化,注释清晰,便于新手理解。
- 错误处理:在实际项目中添加异常处理,如数据库连接错误。
- 扩展性:可以集成消息队列(如Redis)或前端框架(如Vue.js)以增强功能。
总结
通过本教程,您学习了如何使用FastAPI构建实时协作应用,实现基本在线状态检测和通知推送。这只是一个起点,您可以扩展功能,如添加群聊、文件共享等。继续探索FastAPI的异步特性和WebSockets,打造更强大的实时应用。
祝您学习愉快,如有问题,参考FastAPI官方文档进一步学习!
开发工具推荐