在 Windows 下使用 FastAPI 和 Celery

在 Windows 环境下部署 FastAPI 应用并与 Celery 配合使用,确保任务结果能够准确地返回到发起请求的客户端,涉及以下步骤:

  1. 安装必要的包

首先,确保安装了 FastAPI、Uvicorn(用于运行 FastAPI 应用)、Celery 以及 Redis(作为消息代理和结果后端):

pip install fastapi uvicorn celery redis gevent

注意:由于 Windows 不支持 fork,需要安装 gevent 并在启动 Celery 时指定使用该库。

  1. 配置 Celery

在 Windows 上运行 Celery 时,需要使用 gevent 作为并发池。创建一个名为 celery_config.py 的文件,内容如下:

from celery import Celery

celery_app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

celery_app.conf.update(
    worker_pool='gevent',  # 使用 gevent 作为并发池
    worker_concurrency=4   # 设置并发数,根据需要调整
)
  1. 定义 Celery 任务

在同一目录下创建一个名为 tasks.py 的文件,定义任务函数:

from celery_config import celery_app

@celery_app.task
def add(x, y):
    return x + y
  1. 创建 FastAPI 应用

创建一个名为 main.py 的文件,设置 FastAPI 应用并配置 WebSocket 端点:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uuid
import redis
from celery_config import celery_app

app = FastAPI()

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

active_connections = {}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    client_id = str(uuid.uuid4())
    active_connections[client_id] = websocket
    try:
        while True:
            data = await websocket.receive_text()
            task = add.apply_async(args=[int(data), 2], task_id=client_id)
            redis_client.set(task.id, client_id)
    except WebSocketDisconnect:
        del active_connections[client_id]

@app.on_event("startup")
async def startup_event():
    import asyncio
    asyncio.create_task(listen_for_results())

async def listen_for_results():
    pubsub = redis_client.pubsub()
    pubsub.subscribe('task_results')
    while True:
        message = pubsub.get_message()
        if message and message['type'] == 'message':
            task_id = message['data'].decode()
            client_id = redis_client.get(task_id).decode()
            if client_id in active_connections:
                connection = active_connections[client_id]
                result = redis_client.get(f'result_{task_id}').decode()
                await connection.send_text(f"Task result: {result}")
                redis_client.delete(task_id)
                redis_client.delete(f'result_{task_id}')
        await asyncio.sleep(0.1)
  1. 启动 FastAPI 应用

使用 Uvicorn 启动 FastAPI 应用,设置多个工作进程以提高性能:

uvicorn main:app --workers 4
  1. 启动 Celery Worker

在 Windows 上启动 Celery Worker 时,指定使用 gevent 作为并发池,并设置并发数:

celery -A celery_config.celery_app worker -l info --pool=gevent --concurrency=4

注意:在 Windows 上,Celery 的并发模式可能会受到限制,建议使用 geventsolo 池。

  1. 确保 Redis 服务运行

确保 Redis 服务器在本地运行,监听 6379 端口,供 Celery 和 FastAPI 应用使用。

总结

通过上述步骤,您可以在 Windows 环境下部署一个多实例的 FastAPI 应用,并使用 Celery 处理异步任务。FastAPI 应用通过 WebSocket 接收客户端请求,提交给 Celery 进行处理,处理结果通过 Redis 返回给对应的客户端。这种架构确保了任务处理的异步性和高效性,同时利用 Redis 实现了任务结果的分发。

注意事项

  • 在 Windows 环境下,Celery 的并发模型可能存在限制,建议使用 geventsolo 池。
  • 确保 Redis 服务在所有组件中可用,避免因连接问题导致的任务处理失败。
  • 根据实际需求,调整 Celery 的并发数和 FastAPI 的工作进程数,以优化性能。

处理超时任务

在使用 Celery 处理异步任务时,可能会遇到任务执行失败或超时的情况。为确保系统的可靠性和稳定性,建议采取以下措施:

  1. 设置任务超时时间

您可以为 Celery 任务设置软超时(soft_time_limit)和硬超时(time_limit):

  • 软超时:当任务执行超过软超时时间时,Celery 会发送 SoftTimeLimitExceeded 异常到任务中,您可以在任务中捕获此异常并进行适当处理,例如清理资源或记录日志。
  • 硬超时:当任务执行超过硬超时时间时,Celery 会强制终止任务,可能会导致任务状态为失败。

示例代码:

from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(soft_time_limit=360, time_limit=400)
def my_task():
    try:
        # 任务逻辑
        pass
    except SoftTimeLimitExceeded:
        # 处理软超时,例如清理资源
        pass

freebuf.com

  1. 任务失败重试机制

为提高任务的可靠性,您可以为任务设置重试策略,例如在任务失败时自动重试。这对于处理临时故障或网络问题非常有用。

示例代码:

from celery import Celery
from celery.exceptions import MaxRetriesExceededError
from time import sleep

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3, default_retry_delay=30)
def my_task(self):
    try:
        # 任务逻辑
        pass
    except Exception as exc:
        try:
            self.retry(exc=exc)
        except MaxRetriesExceededError:
            # 处理超过最大重试次数的情况
            pass

cnblogs.com

  1. 监控和日志记录

定期检查 Celery 的日志文件,以便及时发现和解决任务执行中的问题。您可以使用 Flower 等工具来实时监控任务的状态和性能。

cnblogs.com

  1. 资源限制和优化

确保 Celery Worker 的并发数(worker_concurrency)设置合理,以避免过度消耗系统资源。同时,优化任务代码,避免长时间的阻塞操作,必要时将耗时操作拆分为多个子任务。

通过以上措施,您可以有效地处理 Celery 任务执行中的失败和超时情况,提高系统的健壮性和可靠性。

在 Workder 中调用子进程

在 Celery Worker 中调用子进程时,需要注意以下几点:

  1. 并发模型影响

Celery 默认使用多进程模型(prefork)作为并发引擎。每个 Worker 进程在启动时会创建子进程来处理任务。如果在任务处理中再次创建子进程,可能会导致子进程数量过多,增加系统负担,甚至耗尽系统资源。

  1. 资源管理

在任务中创建子进程时,应确保合理管理系统资源。避免创建过多子进程,防止系统资源耗尽。

  1. 跨平台兼容性

在 Windows 平台上,Celery 使用 spawn 方法来启动子进程,这与 Unix 系统的 fork 方法有所不同。在子进程中调用新的子进程时,可能会遇到兼容性问题。建议在任务中创建子进程时,使用跨平台的方式,例如使用 multiprocessing 模块,并确保子进程的创建方式在不同操作系统上兼容。

  1. 错误处理

在任务中创建子进程时,应添加适当的错误处理机制。捕获子进程可能抛出的异常,确保主进程能够正确处理这些异常,防止任务执行失败。

示例代码

import os
import sys
import time
import logging
from celery import Celery
from multiprocessing import Process

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_task(data):
    try:
        # 创建子进程处理任务
        p = Process(target=process_data, args=(data,))
        p.start()
        p.join()
    except Exception as e:
        logging.error(f"Error in process_task: {e}")
        raise

def process_data(data):
    try:
        # 子进程处理逻辑
        result = data * 2  # 示例操作
        time.sleep(1)
        logging.info(f"Processed result: {result}")
    except Exception as e:
        logging.error(f"Error in process_data: {e}")
        raise

注意事项

  • 资源限制:在创建子进程时,确保系统资源允许。避免在高并发场景下频繁创建子进程,可能导致性能下降。
  • 跨平台兼容性:在 Windows 平台上,子进程的创建方式与 Unix 系统不同。使用 multiprocessing 模块时,注意在 Windows 上需要将代码放在 if __name__ == '__main__': 保护下,以避免递归创建子进程的问题。
  • 错误处理:在创建和管理子进程时,添加适当的错误处理机制,确保系统的稳定性。

通过遵循上述建议,可以在 Celery Worker 中安全地调用子进程,确保任务的正确执行和系统的稳定性。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇