FastAPI与Celery联动:构建高效异步任务处理体系
一、Celery基础概念
Celery的架构由三部分构成:客户端用于发送任务、消息代理存储任务队列、工作者执行任务。常见的消息代理可选Redis(默认端口6379)或RabbitMQ(默认端口5672)。任务结果的存储建议使用Redis或关系型数据库,需在配置中明确指定backend参数。
二、FastAPI与Celery集成步骤
用户发起请求后,经FastAPI路由处理将任务入队,任务由消息队列传递给Celery工作者执行,执行完的结果存储,客户端可查询结果。
三、代码实现与解析
1. 安装依赖
通过以下命令安装所需依赖:
pip install fastapi==0.103.2 celery==5.3.4 redis==4.5.5 uvicorn==0.23.2 pydantic==2.5.2
2. 项目结构
项目结构如下:
project/
├── main.py
├── celery_app.py
└── tasks.py
3. 核心代码示例
celery_app.py
from celery import Celery
celery_app = Celery(
'worker',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['project.tasks']
)
celery_app.conf.update(task_track_started=True)
tasks.py
from .celery_app import celery_app
@celery_app.task
def process_data(data: dict) -> dict:
"""模拟耗时数据处理任务"""
import time
time.sleep(5)
return {"result": f"Processed {data['input']}"}
main.py
from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_data
app = FastAPI()
class RequestData(BaseModel):
input: str
priority: int = 1
@app.post("/submit-task")
async def submit_task(data: RequestData):
"""提交异步任务接口"""
task = process_data.apply_async(
kwargs={"data": data.dict()},
priority=data.priority
)
return {"task_id": task.id}
四、任务状态查询实现
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""任务状态查询接口"""
from celery.result import AsyncResult
result = AsyncResult(task_id, app=celery_app)
return {
"status": result.status,
"result": result.result if result.ready() else None
}
五、课后Quiz
Q1:为何需为Celery配置不同Redis数据库作为broker和backend?
A1:为区分存储用途,避免任务队列与结果数据相互干扰。broker用0号库存放置理中的任务,backend用1号库保存任务状态和结果。
Q2:如何处理长时间运行任务的超时问题?
A2:在任务装饰器中设置soft_time_limit参数,例如@task(soft_time_limit=300),同时配置worker的--maxtasksperchild参数限制最大任务数。
六、常见报错解决方案
错误现象:Worker收到任务但未执行
检查要点:
1. 确认Redis服务运行状态:执行redis-cli ping
应返回PONG。
2. 检查任务模块是否在配置的include中正确指向。
3. 验证任务函数是否用@celery_app.task装饰器定义。
错误现象:任务结果无法获取
解决方案:
1. 检查backend配置是否正确。
2. 确认任务已执行完成(状态为SUCCESS)。
3. 检查Redis存储空间是否充足。
任务优先级配置
启动worker时指定队列:
celery -A project.celery_app worker -Q high_priority,default -l INFO
接口调用时指定优先级:
process_data.apply_async(
kwargs={"data": data},
queue='high_priority' if data.priority > 5 else 'default'
)
往期文章归档(示例)
免费在线工具(示例)
- CMDragon 在线工具 - 多功能AI工具箱与开发者工具集 | 免费实用工具
- 应用商店 - 搜罗千款提升效率与开发的AI工具与实用程序 | 免费工具
- AI文本生成图像工具 - 应用商店 | 免费工具
文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12998.html