FastAPI与Celery联动:打造高效异步任务处理佳构

FastAPI与Celery联动:构建高效异步任务处理体系

一、Celery基础概念

Celery的架构由三部分构成:客户端用于发送任务、消息代理存储任务队列、工作者执行任务。常见的消息代理可选Redis(默认端口6379)或RabbitMQ(默认端口5672)。任务结果的存储建议使用Redis或关系型数据库,需在配置中明确指定backend参数。

二、FastAPI与Celery集成步骤

用户发起请求后,经FastAPI路由处理将任务入队,任务由消息队列传递给Celery工作者执行,执行完的结果存储,客户端可查询结果。

三、代码实现与解析

1. 安装依赖

通过以下命令安装所需依赖:

pip install fastapi==0.103.2 celery==5.3.4 redis==4.5.5 uvicorn==0.23.2 pydantic==2.5.2

2. 项目结构

项目结构如下:

project/
├── main.py
├── celery_app.py
└── tasks.py

3. 核心代码示例

celery_app.py

from celery import Celery

celery_app = Celery(
    'worker',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['project.tasks']
)
celery_app.conf.update(task_track_started=True)

tasks.py

from .celery_app import celery_app


@celery_app.task
def process_data(data: dict) -> dict:
    """模拟耗时数据处理任务"""
    import time
    time.sleep(5)
    return {"result": f"Processed {data['input']}"}

main.py

from fastapi import FastAPI
from pydantic import BaseModel
from .tasks import process_data

app = FastAPI()


class RequestData(BaseModel):
    input: str
    priority: int = 1


@app.post("/submit-task")
async def submit_task(data: RequestData):
    """提交异步任务接口"""
    task = process_data.apply_async(
        kwargs={"data": data.dict()},
        priority=data.priority
    )
    return {"task_id": task.id}

四、任务状态查询实现

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
    """任务状态查询接口"""
    from celery.result import AsyncResult
    result = AsyncResult(task_id, app=celery_app)
    return {
        "status": result.status,
        "result": result.result if result.ready() else None
    }

五、课后Quiz

Q1:为何需为Celery配置不同Redis数据库作为broker和backend?
A1:为区分存储用途,避免任务队列与结果数据相互干扰。broker用0号库存放置理中的任务,backend用1号库保存任务状态和结果。

Q2:如何处理长时间运行任务的超时问题?
A2:在任务装饰器中设置soft_time_limit参数,例如@task(soft_time_limit=300),同时配置worker的--maxtasksperchild参数限制最大任务数。

六、常见报错解决方案

错误现象:Worker收到任务但未执行

检查要点:
1. 确认Redis服务运行状态:执行redis-cli ping应返回PONG。
2. 检查任务模块是否在配置的include中正确指向。
3. 验证任务函数是否用@celery_app.task装饰器定义。

错误现象:任务结果无法获取

解决方案:
1. 检查backend配置是否正确。
2. 确认任务已执行完成(状态为SUCCESS)。
3. 检查Redis存储空间是否充足。

任务优先级配置

启动worker时指定队列:

celery -A project.celery_app worker -Q high_priority,default -l INFO

接口调用时指定优先级:

process_data.apply_async(
    kwargs={"data": data},
    queue='high_priority' if data.priority > 5 else 'default'
)

往期文章归档(示例)

免费在线工具(示例)

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12998.html

(0)
LomuLomu
上一篇 2025 年 7 月 27 日
下一篇 2025 年 7 月 29 日

相关推荐

  • 全网最全webstorm激活码领取方式,权威webstorm破解教程同步

    重要提示:本文涉及的WebStorm破解补丁及激活码来源于网络搜集,严禁用于商业用途,仅限个人学习研究。如内容存在侵权问题,请联系本人删除。经济状况允许的话,强烈建议支持正版软件! WebStorm作为JetBrains公司旗下一款功能全面的开发编辑工具,完美兼容Windows、Mac和Linux三大操作系统。本篇指南将手把手教你利用破解补丁完成永久激活,畅…

    2026 年 1 月 10 日
    19200
  • 2026国内ChatGPT Plus充值4种有效方法

    在AI提升生产力的今天,ChatGPT无疑是顶尖的效率工具,但受支付环境限制,国内用户想要订阅ChatGPT Plus往往卡在支付环节。今天不玩虚的,直接给大家拆解目前主流可用的4种充值路径,不管是怕麻烦的新手小白,还是爱折腾的极客玩家,都能找到适配自己的方案。 方法一:代充服务(最省心,新手首选) 适用人群: 追求快准稳,不想折腾海外支付环境,想要1分钟快…

    2026 年 3 月 29 日
    29100
  • IntelliJ IDEA 2026年 最新破解教程 插件激活

    本教程适用于IDEA、PyCharm、DataGrip、Goland等,支持Jetbrains全家桶! 废话不多说,先上最新 IDEA 版本破解成功的截图,如下,可以看到已经成功破解到 2099 年辣,舒服! 接下来,我就将通过图文的方式, 来详细讲解如何激活 IDEA至 2099 年。 当然这个激活方法,同样适用于之前的旧版本! 不管你是什么操作系统,什么…

    IDEA破解教程 2026 年 1 月 27 日
    29600
  • chatgpt会员充值先把需求想明白

    准备做 chatgpt会员充值 的人,很多并不是不会付款,而是没先想清楚这次开通到底是为了马上赶任务、延续现有账号,还是为了接下来一段时间保持稳定高频使用。需求没理顺时,人很容易一边找入口一边改主意,前面看着像在推进,后面却总要回头补确认。 如果你已经准备开通,先用一个支持国内支付、到账节奏更清楚的入口处理,通常会比临时切换多个页面更省心。 ChatGPT …

    ChatGPT 2026 年 4 月 29 日
    12200
  • 2026年IntelliJ IDEA激活码与破解补丁区别说明

    声明:本文分享的 IntelliJ IDEA 破解补丁与激活码均来源于网络,仅限个人学习测试使用,请勿用于商业目的。若涉及侵权请联系删除。鼓励有条件的开发者支持正版软件! 话不多说,先展示 IDEA 2025.2.1 版本成功激活的截图。从下图可见,软件已顺利破解至 2099 年,使用体验非常顺畅! 接下来,我将通过图文结合的方式,详细讲解如何对最新版 ID…

    IDEA破解教程 2026 年 5 月 19 日
    10900

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信