FastAPI 教程

10.6 WebSocket 认证与安全

FastAPI WebSocket认证与安全完全指南:新手快速上手

FastAPI 教程

详细教程介绍如何在FastAPI中实现WebSocket连接的认证与安全,包括JWT认证、常见安全威胁防护和代码示例,适合新手快速学习和应用。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI WebSocket认证与安全教程

介绍

WebSocket是一种在单个TCP连接上进行全双工通信的协议,常用于实时应用,如聊天应用、在线游戏等。FastAPI作为现代Web框架,内置了WebSocket支持。然而,开放WebSocket连接可能带来安全风险,因此认证和安全措施至关重要。本教程将逐步教你如何在FastAPI中实现WebSocket认证与安全。

WebSocket认证

为什么需要认证?

WebSocket连接通常在客户端和服务器之间建立长时间连接。如果没有认证,恶意用户可能会未授权访问,导致数据泄露或攻击。

常见认证方法

在FastAPI中,你可以使用以下几种方式为WebSocket添加认证:

  • JWT(JSON Web Tokens):轻量级的认证方式,适用于无状态连接。
  • 基本认证:通过用户名和密码进行简单认证。
  • OAuth2:更复杂的授权协议,适合大型应用。

使用JWT认证WebSocket

以下是使用JWT实现WebSocket认证的示例:

  1. 安装依赖:确保已安装python-josepasslib等库。

    pip install python-jose[cryptography] passlib[bcrypt]
    
  2. 代码实现

    from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
    from fastapi.security import OAuth2PasswordBearer
    from jose import JWTError, jwt
    from pydantic import BaseModel
    
    app = FastAPI()
    
    # 模拟用户数据库和密钥
    SECRET_KEY = "your-secret-key"
    ALGORITHM = "HS256"
    users_db = {"user1": {"password": "hashed_password"}}
    
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
    
    async def get_current_user(token: str = Depends(oauth2_scheme)):
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            username = payload.get("sub")
            if username is None:
                raise credentials_exception
        except JWTError:
            raise credentials_exception
        return username
    
    @app.websocket("/ws/")
    async def websocket_endpoint(websocket: WebSocket, token: str = None):
        # 检查是否有token
        if token is None:
            await websocket.close(code=1008, reason="未提供认证token")
            return
    
        # 验证token
        try:
            username = await get_current_user(token)
        except Exception:
            await websocket.close(code=1008, reason="认证失败")
            return
    
        await websocket.accept()
        try:
            while True:
                data = await websocket.receive_text()
                await websocket.send_text(f"用户 {username}: {data}")
        except WebSocketDisconnect:
            print(f"用户 {username} 断开连接")
    

    解释:这个示例通过WebSocket连接传递token进行认证。如果token无效,连接将被关闭。

WebSocket安全

常见安全威胁

  • 跨站WebSocket劫持(CSWSH):类似于CSRF,攻击者诱导用户连接到恶意WebSocket。
  • 中间人攻击(MitM):如果没有加密,数据可能被窃听。
  • 数据注入:未验证的输入可能导致代码注入。

防护措施

  1. 使用HTTPS/WSS:确保WebSocket连接通过SSL/TLS加密(WSS协议)。

    # 在生产环境中,配置SSL证书
    uvicorn.run(app, host="0.0.0.0", port=8000, ssl_keyfile="key.pem", ssl_certfile="cert.pem")
    
  2. Origin验证:检查WebSocket请求的Origin头,防止跨域攻击。

    @app.websocket("/ws/")
    async def websocket_endpoint(websocket: WebSocket):
        origin = websocket.headers.get("origin")
        allowed_origins = ["https://example.com"]
        if origin not in allowed_origins:
            await websocket.close(code=1008, reason="Origin未授权")
            return
        await websocket.accept()
        # 后续处理
    
  3. 输入验证和清理:对接收的数据进行验证,避免注入攻击。

    from pydantic import ValidationError
    
    class Message(BaseModel):
        text: str
    
    @app.websocket("/ws/")
    async def websocket_endpoint(websocket: WebSocket):
        await websocket.accept()
        try:
            while True:
                data = await websocket.receive_text()
                # 验证数据
                try:
                    message = Message(text=data)
                except ValidationError:
                    await websocket.send_text("数据格式无效")
                    continue
                # 处理安全的数据
                await websocket.send_text(f"收到: {message.text}")
        except WebSocketDisconnect:
            print("连接关闭")
    
  4. 限制连接数和频率:防止DoS攻击。

    from collections import defaultdict
    import time
    
    connections = defaultdict(int)
    MAX_CONNECTIONS = 10
    
    @app.websocket("/ws/")
    async def websocket_endpoint(websocket: WebSocket, client_ip: str):
        if connections[client_ip] >= MAX_CONNECTIONS:
            await websocket.close(code=1008, reason="连接数超限")
            return
        connections[client_ip] += 1
        await websocket.accept()
        try:
            while True:
                data = await websocket.receive_text()
                await websocket.send_text(f"数据: {data}")
        except WebSocketDisconnect:
            connections[client_ip] -= 1
    

最佳实践

  • 总是使用认证:即使是内部应用,也添加认证层。
  • 启用WSS:在生产环境中强制使用加密连接。
  • 定期更新依赖:保持FastAPI和相关库的最新版本,以修复安全漏洞。
  • 监控日志:记录WebSocket连接和异常,便于问题排查。

总结

在FastAPI中实现WebSocket认证与安全是保护实时应用的关键步骤。通过JWT认证、Origin验证、HTTPS加密和输入验证,你可以有效防止常见攻击。作为新手,从简单示例开始,逐步集成安全措施,确保应用稳健运行。如有疑问,参考FastAPI官方文档或社区资源。

扩展阅读

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包