FastAPI与Celery联动:打造高效异步任务处理佳构

FastAPI与Celery联动:构建高效异步任务处理体系

一、Celery基础概念

Celery的架构由三部分构成:客户端用于发送任务、消息代理存储任务队列、工作者执行任务。常见的消息代理可选Redis(默认端口6379)或RabbitMQ(默认端口5672)。任务结果的存储建议使用Redis或关系型数据库,需在配置中明确指定backend参数。

二、FastAPI与Celery集成步骤

用户发起请求后,经FastAPI路由处理将任务入队,任务由消息队列传递给Celery工作者执行,执行完的结果存储,客户端可查询结果。

三、代码实现与解析

1. 安装依赖

通过以下命令安装所需依赖:

pip install fastapi==0.103.2 celery==5.3.4 redis==4.5.5 uvicorn==0.23.2 pydantic==2.5.2

2. 项目结构

项目结构如下:

project/
├── main.py
├── celery_app.py
└── tasks.py

3. 核心代码示例

celery_app.py

from celery import Celery

celery_app = Celery(
    'worker',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['project.tasks']
)
celery_app.conf.update(task_track_started=True)

tasks.py

from .celery_app import celery_app


@celery_app.task
def process_data(data: dict) -> dict:
    """模拟耗时数据处理任务"""
    import time
    time.sleep(5)
    return {"result": f"Processed {data['input']}"}

main.py

from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_data

app = FastAPI()


class RequestData(BaseModel):
    input: str
    priority: int = 1


@app.post("/submit-task")
async def submit_task(data: RequestData):
    """提交异步任务接口"""
    task = process_data.apply_async(
        kwargs={"data": data.dict()},
        priority=data.priority
    )
    return {"task_id": task.id}

四、任务状态查询实现

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """任务状态查询接口"""
    from celery.result import AsyncResult
    result = AsyncResult(task_id, app=celery_app)
    return {
        "status": result.status,
        "result": result.result if result.ready() else None
    }

五、课后Quiz

Q1:为何需为Celery配置不同Redis数据库作为broker和backend?
A1:为区分存储用途,避免任务队列与结果数据相互干扰。broker用0号库存放置理中的任务,backend用1号库保存任务状态和结果。

Q2:如何处理长时间运行任务的超时问题?
A2:在任务装饰器中设置soft_time_limit参数,例如@task(soft_time_limit=300),同时配置worker的--maxtasksperchild参数限制最大任务数。

六、常见报错解决方案

错误现象:Worker收到任务但未执行

检查要点:
1. 确认Redis服务运行状态:执行redis-cli ping应返回PONG。
2. 检查任务模块是否在配置的include中正确指向。
3. 验证任务函数是否用@celery_app.task装饰器定义。

错误现象:任务结果无法获取

解决方案:
1. 检查backend配置是否正确。
2. 确认任务已执行完成(状态为SUCCESS)。
3. 检查Redis存储空间是否充足。

任务优先级配置

启动worker时指定队列:

celery -A project.celery_app worker -Q high_priority,default -l INFO

接口调用时指定优先级:

process_data.apply_async(
    kwargs={"data": data},
    queue='high_priority' if data.priority > 5 else 'default'
)

往期文章归档(示例)

免费在线工具(示例)

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12998.html

(0)
LomuLomu
上一篇 2025 年 7 月 27 日
下一篇 2025 年 7 月 29 日

相关推荐

  • 【前端】javaScript

    目录 一、JavaScript概述 1.1 引入方式 二、基础语法 2.1 变量 2.2 数据类型 2.3 运算符 2.4 对象 2.4.1 数组 2.4.2 函数 2.4.3 对象 三、jQuery 3.1 引入依赖 3.2 jQuery语法 3.3 jQuery选择器 3.4 jQuery事件 3.5 操作元素 3.6 常用方法 一、JavaScript…

    2024 年 12 月 28 日
    37000
  • 2025年最新PyCharm激活码分享 | 永久破解PyCharm至2099年教程(支持Win/Mac/Linux)

    本方法适用于JetBrains全家桶,包括PyCharm、IDEA、DataGrip、Goland等所有产品! 先给大家看看最新PyCharm版本成功破解的截图,有效期直达2099年,完美解决激活问题! 下面我将用详细的图文教程,手把手教你如何将PyCharm激活到2099年。这个方法同样适用于旧版本! 兼容所有操作系统:Windows/Mac/Linux …

    PyCharm激活码 2025 年 7 月 8 日
    12400
  • 2025年最新PyCharm激活码分享 | 永久破解教程(支持2099年)

    全面支持Jetbrains全家桶的破解方案 本教程适用于PyCharm、IDEA、DataGrip、Goland等Jetbrains全系列产品,下面展示的是最新版PyCharm成功破解至2099年的效果图: 接下来将详细介绍如何实现PyCharm永久激活,这个方法同样适用于旧版本! 跨平台支持:Windows/Mac/Linux全兼容 版本通用:新旧版本均可…

    PyCharm激活码 2025 年 8 月 28 日
    17600
  • 2025年最新DataGrip激活码永久破解教程 – 支持2099年全版本破解

    本教程同样适用于JetBrains全家桶(包括DataGrip、PyCharm、IDEA、Goland等) 先展示最新DataGrip版本成功破解的截图,可以看到已完美激活至2099年! 下面将通过详细的图文教程,一步步教你如何永久激活DataGrip至2099年。 本方法具有以下特点:- 全平台适用(Windows/Mac/Linux)- 兼容所有Data…

    DataGrip激活码 2025 年 7 月 7 日
    10400
  • 2025年最新PyCharm激活码永久破解教程(支持2099年)

    本方法适用于JetBrains全家桶,包括PyCharm、IDEA、DataGrip、Goland等所有产品! 先给大家看看最新PyCharm版本成功破解的截图,有效期直达2099年,完全免费使用! 下面我将用详细的图文步骤,手把手教你如何永久激活PyCharm到2099年。 这个方法不仅适用于最新版本,也兼容所有旧版PyCharm! Windows/Mac…

    PyCharm激活码 2025 年 8 月 14 日
    10300

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信