第3部:MySQL連携 Step 9 / 24

FastAPI + MySQL

FastAPIとMySQLをSQLAlchemyで接続。PythonからMySQLを操作する方法を学びます。

必要なパッケージ

requirements.txt

fastapi==0.109.0
uvicorn[standard]==0.27.0
sqlalchemy==2.0.25
pymysql==1.1.0
cryptography==42.0.0
python-dotenv==1.0.0
pydantic[email]==2.5.3

pymysql: PythonからMySQLに接続するドライバー
sqlalchemy: PythonのORM(Object-Relational Mapping)

データベース接続設定

database.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
import os

# 環境変数から接続情報を取得
DATABASE_URL = os.getenv(
    "DATABASE_URL",
    "mysql+pymysql://myuser:mypassword@localhost:3306/myapp"
)

# エンジン作成
engine = create_engine(
    DATABASE_URL,
    echo=True,  # SQLログを出力(開発時のみ)
    pool_pre_ping=True,  # 接続の有効性チェック
)

# セッションファクトリ
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# モデルの基底クラス
Base = declarative_base()

# 依存性注入用
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

接続URL形式:mysql+pymysql://ユーザー:パスワード@ホスト:ポート/データベース名

モデル定義

models.py

from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, nullable=False, index=True)
    name = Column(String(100), nullable=False)
    password_hash = Column(String(255), nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    # リレーション
    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")


class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    # リレーション
    author = relationship("User", back_populates="posts")

Pydanticスキーマ

schemas.py

from pydantic import BaseModel, EmailStr
from datetime import datetime


# User スキーマ
class UserBase(BaseModel):
    email: EmailStr
    name: str

class UserCreate(UserBase):
    password: str

class UserResponse(UserBase):
    id: int
    created_at: datetime

    class Config:
        from_attributes = True


# Post スキーマ
class PostBase(BaseModel):
    title: str
    content: str

class PostCreate(PostBase):
    pass

class PostResponse(PostBase):
    id: int
    author_id: int
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

class PostWithAuthor(PostResponse):
    author: UserResponse

CRUD操作

crud.py

from sqlalchemy.orm import Session
from models import User, Post
from schemas import UserCreate, PostCreate
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


# ========== User ==========
def get_user(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def get_user_by_email(db: Session, email: str):
    return db.query(User).filter(User.email == email).first()

def create_user(db: Session, user: UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = User(
        email=user.email,
        name=user.name,
        password_hash=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user


# ========== Post ==========
def get_posts(db: Session, skip: int = 0, limit: int = 10):
    return db.query(Post).order_by(Post.created_at.desc()).offset(skip).limit(limit).all()

def get_post(db: Session, post_id: int):
    return db.query(Post).filter(Post.id == post_id).first()

def create_post(db: Session, post: PostCreate, author_id: int):
    db_post = Post(**post.model_dump(), author_id=author_id)
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post

def update_post(db: Session, post_id: int, post: PostCreate):
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if db_post:
        db_post.title = post.title
        db_post.content = post.content
        db.commit()
        db.refresh(db_post)
    return db_post

def delete_post(db: Session, post_id: int):
    db_post = db.query(Post).filter(Post.id == post_id).first()
    if db_post:
        db.delete(db_post)
        db.commit()
        return True
    return False

APIエンドポイント

main.py

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from database import engine, get_db, Base
import models
import schemas
import crud

# テーブル作成
Base.metadata.create_all(bind=engine)

app = FastAPI()


# ========== User エンドポイント ==========
@app.post("/users", response_model=schemas.UserResponse, status_code=201)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)


# ========== Post エンドポイント ==========
@app.get("/posts", response_model=list[schemas.PostWithAuthor])
def read_posts(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
    posts = crud.get_posts(db, skip=skip, limit=limit)
    return posts

@app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
def read_post(post_id: int, db: Session = Depends(get_db)):
    db_post = crud.get_post(db, post_id=post_id)
    if db_post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return db_post

@app.post("/posts", response_model=schemas.PostResponse, status_code=201)
def create_post(
    post: schemas.PostCreate,
    author_id: int,  # 後で認証から取得するように変更
    db: Session = Depends(get_db)
):
    return crud.create_post(db=db, post=post, author_id=author_id)

@app.put("/posts/{post_id}", response_model=schemas.PostResponse)
def update_post(
    post_id: int,
    post: schemas.PostCreate,
    db: Session = Depends(get_db)
):
    db_post = crud.update_post(db, post_id=post_id, post=post)
    if db_post is None:
        raise HTTPException(status_code=404, detail="Post not found")
    return db_post

@app.delete("/posts/{post_id}", status_code=204)
def delete_post(post_id: int, db: Session = Depends(get_db)):
    success = crud.delete_post(db, post_id=post_id)
    if not success:
        raise HTTPException(status_code=404, detail="Post not found")
    return None

ファイル構成

backend/
├── main.py           # FastAPIアプリ・エンドポイント
├── database.py       # DB接続設定
├── models.py         # SQLAlchemyモデル
├── schemas.py        # Pydanticスキーマ
├── crud.py           # CRUD操作
├── requirements.txt  # パッケージ
└── Dockerfile

まとめ

  • SQLAlchemy + pymysqlでMySQLに接続
  • モデル(models.py)でテーブル構造を定義
  • スキーマ(schemas.py)でAPIの入出力を定義
  • CRUD操作を関数として分離
  • Dependsでセッションを依存性注入
MySQL入門 次へ:マイグレーション