FastAPI 教程

8.4 数据库会话中间件

FastAPI数据库会话中间件完整教程:从入门到精通

FastAPI 教程

本教程详细介绍了如何在FastAPI中使用数据库会话中间件,涵盖SQLAlchemy集成、异步支持、依赖注入实现,以及最佳实践和SEO优化提示,适合新手快速上手。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI数据库会话中间件教程

简介

数据库会话中间件是FastAPI应用中处理数据库连接和事务的关键组件。它能帮助您高效管理数据库会话,避免资源泄露,并提升性能。本教程将从零开始,教您如何实现一个简单的数据库会话中间件,使用SQLAlchemy作为示例。

为什么需要数据库会话中间件?

  • 连接管理:确保数据库连接在请求生命周期内正确打开和关闭。
  • 事务处理:提供一致性,支持回滚和提交。
  • 性能优化:通过连接池减少开销。
  • 异步支持:利用FastAPI的异步特性处理数据库操作。

前提条件

  • 安装Python 3.7+ 和 FastAPI。
  • 安装SQLAlchemy和异步驱动(如asyncpg用于PostgreSQL)。
  • 基础FastAPI知识。

步骤1:设置环境和依赖

首先,安装必要的包:

pip install fastapi uvicorn sqlalchemy databases

步骤2:创建数据库模型和连接

使用SQLAlchemy定义模型:

from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)

设置数据库连接字符串(例如,使用SQLite示例):

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, echo=True)

步骤3:实现数据库会话中间件

创建会话中间件,确保每个请求有独立的会话:

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

# 创建FastAPI应用
app = FastAPI()

# 创建异步会话工厂
AsyncSessionLocal = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

# 数据库会话中间件函数
def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        yield session

步骤4:使用依赖注入集成会话

在FastAPI中,通过依赖注入在端点中使用会话:

from fastapi import Depends

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # 查询数据库
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if user:
        return {"id": user.id, "name": user.name}
    return {"error": "User not found"}

步骤5:处理事务和关闭会话

在中间件中自动管理事务:

  • 在请求开始时开启会话。
  • 在请求结束时自动提交或回滚,并关闭会话。

优化get_db函数:

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()  # 成功时提交
        except Exception:
            await session.rollback()  # 出错时回滚
            raise
        finally:
            await session.close()  # 确保关闭

步骤6:完整示例代码

整合到一个文件中:

from fastapi import FastAPI, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

app = FastAPI()

class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

@app.get("/users/{user_id}")
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    return {"user": user if user else None}

步骤7:SEO优化提示

  • URL友好:使用清晰的路由(如/users/{id})。
  • 响应描述:在端点添加详细文档,帮助搜索引擎理解内容。
  • 性能监控:使用工具如Prometheus监控数据库会话性能。
  • 缓存策略:结合中间件实现缓存,减少数据库负载。

总结

数据库会话中间件是FastAPI应用的核心,通过正确实现,您可以确保数据安全和应用效率。本教程覆盖了基础到高级的用法,适合新手入门。练习时,尝试扩展功能,如连接池配置或错误日志记录。

进阶阅读

  • 探索FastAPI官方文档了解更多中间件选项。
  • 学习如何使用PostgreSQL或MySQL进行生产部署。
  • 研究异步数据库驱动以提升性能。

Happy coding!

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包