在 Windows 环境下部署 FastAPI 应用并与 Celery 配合使用,确保任务结果能够准确地返回到发起请求的客户端,涉及以下步骤:
- 安装必要的包:
首先,确保安装了 FastAPI、Uvicorn(用于运行 FastAPI 应用)、Celery 以及 Redis(作为消息代理和结果后端):
pip install fastapi uvicorn celery redis gevent
注意:由于 Windows 不支持 fork
,需要安装 gevent
并在启动 Celery 时指定使用该库。
- 配置 Celery:
在 Windows 上运行 Celery 时,需要使用 gevent
作为并发池。创建一个名为 celery_config.py
的文件,内容如下:
from celery import Celery
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
celery_app.conf.update(
worker_pool='gevent', # 使用 gevent 作为并发池
worker_concurrency=4 # 设置并发数,根据需要调整
)
- 定义 Celery 任务:
在同一目录下创建一个名为 tasks.py
的文件,定义任务函数:
from celery_config import celery_app
@celery_app.task
def add(x, y):
return x + y
- 创建 FastAPI 应用:
创建一个名为 main.py
的文件,设置 FastAPI 应用并配置 WebSocket 端点:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uuid
import redis
from celery_config import celery_app
app = FastAPI()
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
active_connections = {}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = str(uuid.uuid4())
active_connections[client_id] = websocket
try:
while True:
data = await websocket.receive_text()
task = add.apply_async(args=[int(data), 2], task_id=client_id)
redis_client.set(task.id, client_id)
except WebSocketDisconnect:
del active_connections[client_id]
@app.on_event("startup")
async def startup_event():
import asyncio
asyncio.create_task(listen_for_results())
async def listen_for_results():
pubsub = redis_client.pubsub()
pubsub.subscribe('task_results')
while True:
message = pubsub.get_message()
if message and message['type'] == 'message':
task_id = message['data'].decode()
client_id = redis_client.get(task_id).decode()
if client_id in active_connections:
connection = active_connections[client_id]
result = redis_client.get(f'result_{task_id}').decode()
await connection.send_text(f"Task result: {result}")
redis_client.delete(task_id)
redis_client.delete(f'result_{task_id}')
await asyncio.sleep(0.1)
- 启动 FastAPI 应用:
使用 Uvicorn 启动 FastAPI 应用,设置多个工作进程以提高性能:
uvicorn main:app --workers 4
- 启动 Celery Worker:
在 Windows 上启动 Celery Worker 时,指定使用 gevent
作为并发池,并设置并发数:
celery -A celery_config.celery_app worker -l info --pool=gevent --concurrency=4
注意:在 Windows 上,Celery 的并发模式可能会受到限制,建议使用 gevent
或 solo
池。
- 确保 Redis 服务运行:
确保 Redis 服务器在本地运行,监听 6379 端口,供 Celery 和 FastAPI 应用使用。
总结:
通过上述步骤,您可以在 Windows 环境下部署一个多实例的 FastAPI 应用,并使用 Celery 处理异步任务。FastAPI 应用通过 WebSocket 接收客户端请求,提交给 Celery 进行处理,处理结果通过 Redis 返回给对应的客户端。这种架构确保了任务处理的异步性和高效性,同时利用 Redis 实现了任务结果的分发。
注意事项:
- 在 Windows 环境下,Celery 的并发模型可能存在限制,建议使用
gevent
或solo
池。 - 确保 Redis 服务在所有组件中可用,避免因连接问题导致的任务处理失败。
- 根据实际需求,调整 Celery 的并发数和 FastAPI 的工作进程数,以优化性能。
处理超时任务
在使用 Celery 处理异步任务时,可能会遇到任务执行失败或超时的情况。为确保系统的可靠性和稳定性,建议采取以下措施:
- 设置任务超时时间:
您可以为 Celery 任务设置软超时(soft_time_limit
)和硬超时(time_limit
):
- 软超时:当任务执行超过软超时时间时,Celery 会发送
SoftTimeLimitExceeded
异常到任务中,您可以在任务中捕获此异常并进行适当处理,例如清理资源或记录日志。 - 硬超时:当任务执行超过硬超时时间时,Celery 会强制终止任务,可能会导致任务状态为失败。
示例代码:
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(soft_time_limit=360, time_limit=400)
def my_task():
try:
# 任务逻辑
pass
except SoftTimeLimitExceeded:
# 处理软超时,例如清理资源
pass
- 任务失败重试机制:
为提高任务的可靠性,您可以为任务设置重试策略,例如在任务失败时自动重试。这对于处理临时故障或网络问题非常有用。
示例代码:
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
from time import sleep
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=30)
def my_task(self):
try:
# 任务逻辑
pass
except Exception as exc:
try:
self.retry(exc=exc)
except MaxRetriesExceededError:
# 处理超过最大重试次数的情况
pass
- 监控和日志记录:
定期检查 Celery 的日志文件,以便及时发现和解决任务执行中的问题。您可以使用 Flower 等工具来实时监控任务的状态和性能。
- 资源限制和优化:
确保 Celery Worker 的并发数(worker_concurrency
)设置合理,以避免过度消耗系统资源。同时,优化任务代码,避免长时间的阻塞操作,必要时将耗时操作拆分为多个子任务。
通过以上措施,您可以有效地处理 Celery 任务执行中的失败和超时情况,提高系统的健壮性和可靠性。
在 Workder 中调用子进程
在 Celery Worker 中调用子进程时,需要注意以下几点:
- 并发模型影响:
Celery 默认使用多进程模型(prefork)作为并发引擎。每个 Worker 进程在启动时会创建子进程来处理任务。如果在任务处理中再次创建子进程,可能会导致子进程数量过多,增加系统负担,甚至耗尽系统资源。
- 资源管理:
在任务中创建子进程时,应确保合理管理系统资源。避免创建过多子进程,防止系统资源耗尽。
- 跨平台兼容性:
在 Windows 平台上,Celery 使用 spawn
方法来启动子进程,这与 Unix 系统的 fork
方法有所不同。在子进程中调用新的子进程时,可能会遇到兼容性问题。建议在任务中创建子进程时,使用跨平台的方式,例如使用 multiprocessing
模块,并确保子进程的创建方式在不同操作系统上兼容。
- 错误处理:
在任务中创建子进程时,应添加适当的错误处理机制。捕获子进程可能抛出的异常,确保主进程能够正确处理这些异常,防止任务执行失败。
示例代码:
import os
import sys
import time
import logging
from celery import Celery
from multiprocessing import Process
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_task(data):
try:
# 创建子进程处理任务
p = Process(target=process_data, args=(data,))
p.start()
p.join()
except Exception as e:
logging.error(f"Error in process_task: {e}")
raise
def process_data(data):
try:
# 子进程处理逻辑
result = data * 2 # 示例操作
time.sleep(1)
logging.info(f"Processed result: {result}")
except Exception as e:
logging.error(f"Error in process_data: {e}")
raise
注意事项:
- 资源限制:在创建子进程时,确保系统资源允许。避免在高并发场景下频繁创建子进程,可能导致性能下降。
- 跨平台兼容性:在 Windows 平台上,子进程的创建方式与 Unix 系统不同。使用
multiprocessing
模块时,注意在 Windows 上需要将代码放在if __name__ == '__main__':
保护下,以避免递归创建子进程的问题。 - 错误处理:在创建和管理子进程时,添加适当的错误处理机制,确保系统的稳定性。
通过遵循上述建议,可以在 Celery Worker 中安全地调用子进程,确保任务的正确执行和系统的稳定性。