第3部:AIとバックエンド高度化 Step 8 / 36

非同期処理の実装

メール送信、画像処理、外部API連携など、時間のかかる処理をバックグラウンドで実行する方法をAIと一緒に実装します。

非同期処理が必要な場面

メール送信

ユーザー登録時の確認メール、通知メールなど

画像・動画処理

サムネイル生成、リサイズ、フォーマット変換

外部API連携

決済処理、SNS投稿、Webhook送信

重い計算処理

レポート生成、データ集計、AI推論

Celery + Redis による実装

AIへの指示:Celery環境構築

FastAPI + Celery + Redis で非同期タスク処理を実装したいです。

【環境】
- Docker Compose で構築
- Redis をメッセージブローカーとして使用
- 結果もRedisに保存

【タスク例】
- ユーザー登録時にウェルカムメールを送信
- 画像アップロード時にサムネイルを生成

以下を作成してください:
1. docker-compose.yml の追記
2. Celeryの設定ファイル
3. タスクの定義
4. FastAPIからタスクを呼び出す方法

docker-compose.yml

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  celery_worker:
    build: ./backend
    command: celery -A app.celery_app worker --loglevel=info
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  celery_beat:  # 定期実行用
    build: ./backend
    command: celery -A app.celery_app beat --loglevel=info
    depends_on:
      - redis

タスク定義例

# app/tasks/email.py
from app.celery_app import celery_app

@celery_app.task
def send_welcome_email(user_email: str, user_name: str):
    """ウェルカムメールを送信(非同期)"""
    # メール送信処理
    print(f"Sending welcome email to {user_email}")
    # ... 実際のメール送信処理
    return {"status": "sent", "email": user_email}

FastAPIから呼び出し

# app/api/users.py
from app.tasks.email import send_welcome_email

@router.post("/users/")
async def create_user(user: UserCreate):
    # ユーザー作成
    new_user = await user_service.create(user)

    # 非同期でメール送信(すぐにレスポンスを返す)
    send_welcome_email.delay(new_user.email, new_user.name)

    return new_user

タスクの状態管理

AIへの指示:進捗追跡

非同期タスクの進捗をフロントエンドで確認できるようにしたいです。

【要件】
- タスクのID、状態(pending/running/completed/failed)、進捗率を取得
- フロントエンドからポーリングで状態確認
- 完了時に結果を取得

実装方法を教えてください。

タスク状態確認API

@router.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None
    }

まとめ

  • 時間のかかる処理は非同期化 - ユーザーを待たせない
  • Celery + Redis - Python での定番構成
  • .delay() で呼び出し - 即座にレスポンスを返せる
  • 状態追跡 - AsyncResultで進捗確認
既存コードのリファクタリング 次へ:キャッシュ設計と実装