Python 的 async/await 可以處理 I/O 並發,但 CPU 密集或需要長時間等待外部 API(像是呼叫 LLM)的任務,塞在 request/response 週期裡一樣會讓 API 變慢。解法是把這些任務推進背景佇列,讓 worker process 非同步消化。
Celery 是 Python 生態裡做這件事的標準工具,成熟、有完整生態、FastAPI / Django / Flask 都支援。
Celery 是什麼
Celery 是分散式任務佇列框架,由三個角色組成:
- Producer:把任務推進佇列的那一方(通常是 API server)
- Broker:訊息中介,存放待處理的任務(用 Redis 或 RabbitMQ)
- Worker:從 broker 取任務出來執行的 process
FastAPI (Producer)
│
▼
Redis (Broker)
│
▼
Celery Worker(s)
Worker 可以開多個 process,水平擴展很直接。Broker 負責確保每個任務只被一個 worker 取走(at-least-once 語意,需要 idempotency 要自己處理)。
安裝
pip install celery redis
基本設定
celery_app.py
from celery import Celery
app = Celery(
'daodao_ai',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1', # 存任務結果
)
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Taipei',
enable_utc=True,
)
Broker 存待處理任務,Backend 存執行結果。如果不需要查詢結果,可以不設 backend。
定義和呼叫任務
定義任務
from celery_app import app
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60, # 秒
)
def generate_ai_feedback(self, user_id: str, practice_id: str):
try:
# 呼叫 LLM 生成學習回饋
feedback = llm_client.generate(
prompt=build_feedback_prompt(user_id, practice_id)
)
save_feedback(practice_id, feedback)
return {"status": "ok", "practice_id": practice_id}
except Exception as exc:
raise self.retry(exc=exc)
從 FastAPI 呼叫
from fastapi import FastAPI
from tasks import generate_ai_feedback
api = FastAPI()
@api.post("/practices/{practice_id}/feedback")
async def request_feedback(practice_id: str, user_id: str):
# 丟進佇列,立即回應 202
task = generate_ai_feedback.delay(user_id, practice_id)
return {"task_id": task.id, "status": "queued"}
@api.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
from celery.result import AsyncResult
result = AsyncResult(task_id)
return {"status": result.status, "result": result.result}
排程任務(Celery Beat)
Celery Beat 是排程器,類似 crontab,讓任務定時執行。
from celery.schedules import crontab
app.conf.beat_schedule = {
# 每天凌晨 2 點更新推薦快取
'refresh-recommendations': {
'task': 'tasks.refresh_recommendation_cache',
'schedule': crontab(hour=2, minute=0),
},
# 每小時跑一次 embedding 更新
'update-embeddings': {
'task': 'tasks.update_user_embeddings',
'schedule': crontab(minute=0),
},
}
啟動 Beat scheduler(獨立 process):
celery -A celery_app beat --loglevel=info
啟動 Worker
# 單一 worker,4 個並發
celery -A celery_app worker --concurrency=4 --loglevel=info
# 指定 queue(不同任務分到不同 queue)
celery -A celery_app worker -Q ai_tasks,default --concurrency=4
任務鏈(Chain)和分組(Group)
Celery 支援把多個任務串起來:
from celery import chain, group
# Chain:依序執行
result = chain(
preprocess_data.s(user_id),
generate_embeddings.s(),
update_recommendations.s(),
).delay()
# Group:並行執行,等所有完成
result = group(
generate_feedback.s(pid) for pid in practice_ids
).delay()
這在 AI pipeline 裡很有用:先做 embedding,再做向量搜尋,再做 LLM 生成,每一步都可以獨立 retry。
島島的 AI 服務怎麼用 Celery
島島的 Python AI 後端(daodao-ai-backend)是 FastAPI + Celery 的組合,Redis 同時當 BullMQ 的 broker 和 Celery 的 broker(用不同的 db index 隔開)。
主要任務類型
generate_ai_feedback:使用者完成學習實踐後,非同步呼叫 LLM 生成個人化回饋,可能要 10-30 秒,不能在 request 裡等update_user_embeddings:使用者更新學習目標或實踐記錄時,重新計算 embedding 存到 Qdrant,供推薦引擎使用refresh_recommendation_cache:定期從 ClickHouse 拿最新的行為資料,重新算推薦結果存到 Redis
FastAPI endpoint 收到請求
│
└── .delay() 把任務推進 Redis
│
Celery Worker 取走
│
├── 呼叫 LLM(Ollama / OpenAI)
├── 寫結果回 PostgreSQL
└── 更新 Redis 快取
Node.js 後端透過 HTTP 呼叫 FastAPI 觸發這些任務,只要 FastAPI 回 202 Accepted 就好,不用等 Celery 跑完。
監控:Flower
Celery 的官方監控工具是 Flower,可以看 worker 狀態、任務執行歷史、失敗任務:
pip install flower
celery -A celery_app flower --port=5555
取捨
優點
- Python 生態最成熟的選擇,Django / FastAPI / Flask 都有完整整合
- 功能完整:重試、排程、任務鏈、worker 優先級
- 水平擴展直接,多開 worker process 即可
- Flower 監控介面完整
缺點
- 設定比 BullMQ 複雜一些(broker + backend + beat scheduler 三個 process)
- 跨語言不友好:Node.js 的 BullMQ job 不能直接被 Celery worker 消費
- 序列化坑:task payload 預設用 pickle,要改成 json 才安全
- at-least-once 語意:任務可能被執行超過一次(worker crash 重啟),需要自己處理 idempotency
Celery vs BullMQ
| Celery | BullMQ | |
|---|---|---|
| 語言 | Python | Node.js |
| Broker | Redis / RabbitMQ | Redis(唯一) |
| 排程 | Celery Beat | 原生 Cron repeat |
| 任務鏈 | chain / group | Flow |
| 監控 | Flower | Bull Board |
語言決定選哪個。島島的 Python AI 服務用 Celery,Node.js 後端用 BullMQ,兩邊各自獨立,共用同一台 Redis(不同 db index)。
參考資料
- Celery 官方文件
- Celery + Redis 設定指南
- Flower 監控工具
- FastAPI + Celery 整合範例
- 島島阿學技術架構全覽 — Celery 在 Python AI 服務(FastAPI + Celery)中處理 LLM 回饋生成與 embedding 更新的實際架構