13.6 异步任务队列(Celery + Redis)
FastAPI异步任务队列教程:Celery与Redis集成指南
本教程详细介绍了如何在FastAPI中集成Celery和Redis来构建高效的异步任务队列。从基础概念到实战代码,逐步指导,适合初学者学习,帮助提升Web应用性能。
FastAPI异步任务队列教程:Celery与Redis集成
引言
在Web开发中,处理耗时任务如发送邮件、数据处理或API调用时,如果直接在请求中执行,可能会导致用户等待时间长、服务器响应慢。异步任务队列可以解决这个问题,将任务放入后台处理,让应用快速返回响应。本教程将指导你在FastAPI中使用Celery(分布式任务队列)和Redis(消息代理)来实现异步任务处理,适合新手入门。
什么是Celery和Redis?
- Celery:一个分布式任务队列框架,用于异步执行任务。它支持任务调度、监控和并发处理。
- Redis:一个内存数据结构存储,用作消息代理,在Celery中存储任务队列和结果。它快速、可靠,适合处理大量任务。 结合FastAPI,你可以构建高性能的异步Web应用。
前提条件
在开始前,请确保你已经具备以下条件:
- Python 3.7或更高版本安装。
- 基本了解Python和FastAPI框架(如果不熟悉,可以先学习FastAPI基础)。
- 安装包管理工具如pip。
步骤1:安装必要的包
打开终端或命令行,运行以下命令安装FastAPI、Celery、Redis和Uvicorn(ASGI服务器):
pip install fastapi uvicorn celery redis
如果使用虚拟环境,先激活环境再安装。
步骤2:设置Redis
Redis将作为消息代理存储任务。你可以从官网下载并安装Redis,或使用Docker快速启动。
- 本地安装:访问Redis官网下载并运行Redis服务器。
- 使用Docker:运行
docker run -d -p 6379:6379 redis启动Redis容器。 确保Redis服务在默认端口6379上运行,以便Celery连接。
步骤3:配置Celery应用
创建Python文件,例如celery_app.py,并配置Celery。
from celery import Celery
# 初始化Celery应用,使用Redis作为消息代理
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0", # Redis连接URL
backend="redis://localhost:6379/0" # 存储结果的后端
)
# 可选:配置任务设置
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
这里,我们创建了一个Celery实例,指定broker和backend为Redis。broker用于发送任务,backend用于存储任务结果。
步骤4:创建异步任务
在同一文件中或新建文件,定义具体的异步任务。例如,创建一个发送邮件的模拟任务。
from celery_app import celery_app
import time
@celery_app.task # 装饰器标记为Celery任务
def send_email(email_address: str, message: str):
"""模拟发送邮件的异步任务"""
print(f"开始发送邮件到 {email_address}")
time.sleep(5) # 模拟耗时操作,例如网络请求
print(f"邮件发送完成: {message}")
return {"status": "success", "email": email_address}
这个任务使用@celery_app.task装饰器,Celery将把它视为可异步执行的任务。time.sleep(5)模拟处理时间,在真实场景中可能是API调用或数据库操作。
步骤5:集成到FastAPI应用
创建FastAPI应用文件,例如main.py,并调用Celery任务。
from fastapi import FastAPI
from celery_app import send_email # 导入Celery任务
app = FastAPI(title="异步任务队列示例")
@app.post("/send-email/")
async def send_email_endpoint(email: str, message: str):
"""FastAPI端点:触发异步邮件发送任务"""
# 调用Celery任务,使用.delay()方法异步执行
task = send_email.delay(email, message)
return {
"message": "邮件任务已提交到队列",
"task_id": task.id # 返回任务ID以便追踪
}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""检查任务状态"""
from celery_app import celery_app
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
在这个示例中,FastAPI端点/send-email/接收邮件参数,然后调用send_email.delay()将任务推送到Celery队列,立即返回响应。另一个端点/task-status/允许用户查询任务状态,使用任务ID从Redis后端获取结果。
步骤6:运行和测试
- 启动Redis服务:如果使用本地安装,运行
redis-server;如果使用Docker,确保容器运行中。 - 启动Celery Worker:在终端中,运行以下命令启动Celery worker处理任务:
celery -A celery_app worker --loglevel=info-A celery_app指定Celery应用模块,celery_app是我们的文件名(不含.py扩展)。 - 启动FastAPI应用:在另一个终端中,运行Uvicorn服务器:
uvicorn main:app --reload - 测试:使用浏览器或工具如curl访问API。例如:
- 发送POST请求到
http://localhost:8000/send-email/,参数email=test@example.com&message=Hello,你会立即收到响应。 - 然后,在Celery worker终端查看任务执行日志,或用GET请求
/task-status/{task_id}检查状态。
- 发送POST请求到
最佳实践和注意事项
- 错误处理:在Celery任务中添加异常处理,使用
@celery_app.task(bind=True)访问任务实例,记录错误。 - 监控:使用Flower(Celery监控工具)或集成日志系统来跟踪任务执行。
- 扩展性:在生产环境中,考虑使用多个Celery worker、设置任务优先级或使用其他消息代理如RabbitMQ。
- 安全性:确保Redis配置安全,避免暴露敏感数据。
- 性能优化:根据任务负载调整Celery并发设置。
总结
通过本教程,你学会了如何在FastAPI中集成Celery和Redis来构建异步任务队列。关键步骤包括安装包、配置Celery、定义任务、集成FastAPI端点和运行服务。这可以显著提升应用响应速度和用户体验。继续实践,探索更多高级功能如任务重试、调度和分布式处理!
如果有问题,参考Celery和FastAPI官方文档,或在线社区寻求帮助。祝你学习愉快!