FastAPI 教程

13.6 异步任务队列(Celery + Redis)

FastAPI异步任务队列教程:Celery与Redis集成指南

FastAPI 教程

本教程详细介绍了如何在FastAPI中集成Celery和Redis来构建高效的异步任务队列。从基础概念到实战代码,逐步指导,适合初学者学习,帮助提升Web应用性能。

推荐工具
PyCharm专业版开发必备

功能强大的Python IDE,提供智能代码补全、代码分析、调试和测试工具,提高Python开发效率。特别适合处理列表等数据结构的开发工作。

了解更多

FastAPI异步任务队列教程:Celery与Redis集成

引言

在Web开发中,处理耗时任务如发送邮件、数据处理或API调用时,如果直接在请求中执行,可能会导致用户等待时间长、服务器响应慢。异步任务队列可以解决这个问题,将任务放入后台处理,让应用快速返回响应。本教程将指导你在FastAPI中使用Celery(分布式任务队列)和Redis(消息代理)来实现异步任务处理,适合新手入门。

什么是Celery和Redis?

  • Celery:一个分布式任务队列框架,用于异步执行任务。它支持任务调度、监控和并发处理。
  • Redis:一个内存数据结构存储,用作消息代理,在Celery中存储任务队列和结果。它快速、可靠,适合处理大量任务。 结合FastAPI,你可以构建高性能的异步Web应用。

前提条件

在开始前,请确保你已经具备以下条件:

  • Python 3.7或更高版本安装。
  • 基本了解Python和FastAPI框架(如果不熟悉,可以先学习FastAPI基础)。
  • 安装包管理工具如pip。

步骤1:安装必要的包

打开终端或命令行,运行以下命令安装FastAPI、Celery、Redis和Uvicorn(ASGI服务器):

pip install fastapi uvicorn celery redis

如果使用虚拟环境,先激活环境再安装。

步骤2:设置Redis

Redis将作为消息代理存储任务。你可以从官网下载并安装Redis,或使用Docker快速启动。

  • 本地安装:访问Redis官网下载并运行Redis服务器。
  • 使用Docker:运行docker run -d -p 6379:6379 redis启动Redis容器。 确保Redis服务在默认端口6379上运行,以便Celery连接。

步骤3:配置Celery应用

创建Python文件,例如celery_app.py,并配置Celery。

from celery import Celery

# 初始化Celery应用,使用Redis作为消息代理
celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",  # Redis连接URL
    backend="redis://localhost:6379/0"   # 存储结果的后端
)

# 可选:配置任务设置
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
)

这里,我们创建了一个Celery实例,指定broker和backend为Redis。broker用于发送任务,backend用于存储任务结果。

步骤4:创建异步任务

在同一文件中或新建文件,定义具体的异步任务。例如,创建一个发送邮件的模拟任务。

from celery_app import celery_app
import time

@celery_app.task  # 装饰器标记为Celery任务
def send_email(email_address: str, message: str):
    """模拟发送邮件的异步任务"""
    print(f"开始发送邮件到 {email_address}")
    time.sleep(5)  # 模拟耗时操作,例如网络请求
    print(f"邮件发送完成: {message}")
    return {"status": "success", "email": email_address}

这个任务使用@celery_app.task装饰器,Celery将把它视为可异步执行的任务。time.sleep(5)模拟处理时间,在真实场景中可能是API调用或数据库操作。

步骤5:集成到FastAPI应用

创建FastAPI应用文件,例如main.py,并调用Celery任务。

from fastapi import FastAPI
from celery_app import send_email  # 导入Celery任务

app = FastAPI(title="异步任务队列示例")

@app.post("/send-email/")
async def send_email_endpoint(email: str, message: str):
    """FastAPI端点:触发异步邮件发送任务"""
    # 调用Celery任务,使用.delay()方法异步执行
    task = send_email.delay(email, message)
    return {
        "message": "邮件任务已提交到队列",
        "task_id": task.id  # 返回任务ID以便追踪
    }

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """检查任务状态"""
    from celery_app import celery_app
    result = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

在这个示例中,FastAPI端点/send-email/接收邮件参数,然后调用send_email.delay()将任务推送到Celery队列,立即返回响应。另一个端点/task-status/允许用户查询任务状态,使用任务ID从Redis后端获取结果。

步骤6:运行和测试

  1. 启动Redis服务:如果使用本地安装,运行redis-server;如果使用Docker,确保容器运行中。
  2. 启动Celery Worker:在终端中,运行以下命令启动Celery worker处理任务:
    celery -A celery_app worker --loglevel=info
    
    -A celery_app指定Celery应用模块,celery_app是我们的文件名(不含.py扩展)。
  3. 启动FastAPI应用:在另一个终端中,运行Uvicorn服务器:
    uvicorn main:app --reload
    
  4. 测试:使用浏览器或工具如curl访问API。例如:
    • 发送POST请求到http://localhost:8000/send-email/,参数email=test@example.com&message=Hello,你会立即收到响应。
    • 然后,在Celery worker终端查看任务执行日志,或用GET请求/task-status/{task_id}检查状态。

最佳实践和注意事项

  • 错误处理:在Celery任务中添加异常处理,使用@celery_app.task(bind=True)访问任务实例,记录错误。
  • 监控:使用Flower(Celery监控工具)或集成日志系统来跟踪任务执行。
  • 扩展性:在生产环境中,考虑使用多个Celery worker、设置任务优先级或使用其他消息代理如RabbitMQ。
  • 安全性:确保Redis配置安全,避免暴露敏感数据。
  • 性能优化:根据任务负载调整Celery并发设置。

总结

通过本教程,你学会了如何在FastAPI中集成Celery和Redis来构建异步任务队列。关键步骤包括安装包、配置Celery、定义任务、集成FastAPI端点和运行服务。这可以显著提升应用响应速度和用户体验。继续实践,探索更多高级功能如任务重试、调度和分布式处理!

如果有问题,参考Celery和FastAPI官方文档,或在线社区寻求帮助。祝你学习愉快!

开发工具推荐
Python开发者工具包

包含虚拟环境管理、代码格式化、依赖管理、测试框架等Python开发全流程工具,提高开发效率。特别适合处理复杂数据结构和算法。

获取工具包