Final Real Project in Fast APIs

What We're Building

A complete Task Management API — like a mini Trello/Jira backend.

This project uses everything from all 6 stages together in a real, production-quality structure. When done, you'll have a portfolio project you can actually show.


Features

✅ User registration and login (JWT auth)
✅ Workspaces — users create workspaces, invite members
✅ Projects — inside workspaces
✅ Tasks — inside projects, assign to members
✅ Comments — on tasks
✅ File attachments — on tasks
✅ Role based access — owner, admin, member
✅ Background email notifications
✅ Pagination on all list endpoints
✅ Full filtering and search
✅ Consistent API responses
✅ Complete /docs documentation

Final Project Structure

task-manager-api/
├── routers/
│   ├── __init__.py
│   ├── auth.py
│   ├── users.py
│   ├── workspaces.py
│   ├── projects.py
│   ├── tasks.py
│   └── comments.py
├── uploads/
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth_utils.py
├── dependencies.py
├── config.py
├── utils.py
└── requirements.txt

Step 1 — Setup

mkdir task-manager-api
cd task-manager-api
python -m venv venv
venv\Scripts\activate     # Windows
source venv/bin/activate  # Mac/Linux

pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] python-multipart python-dotenv pydantic-settings

pip freeze > requirements.txt

Create .env:

SECRET_KEY=super-secret-key-make-this-very-long-and-random-in-production
DATABASE_URL=sqlite:///./taskmanager.db
ACCESS_TOKEN_EXPIRE_MINUTES=60

Create .gitignore:

venv/
__pycache__/
*.pyc
.env
*.db
uploads/

Step 2 — config.py

# config.py

from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./taskmanager.db"
    access_token_expire_minutes: int = 60

    class Config:
        env_file = ".env"


settings = Settings()

Step 3 — database.py

# database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import settings

engine = create_engine(
    settings.database_url,
    connect_args={"check_same_thread": False}
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Step 4 — models.py

This is the core — all database tables:

# models.py

from sqlalchemy import (
    Column, Integer, String, Boolean, DateTime,
    Text, ForeignKey, Enum as SQLEnum
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
import enum


# ── Enums ─────────────────────────────────────────

class WorkspaceRole(str, enum.Enum):
    owner = "owner"
    admin = "admin"
    member = "member"

class TaskStatus(str, enum.Enum):
    todo = "todo"
    in_progress = "in_progress"
    in_review = "in_review"
    done = "done"

class TaskPriority(str, enum.Enum):
    low = "low"
    medium = "medium"
    high = "high"
    urgent = "urgent"


# ── Tables ────────────────────────────────────────

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, nullable=False, index=True)
    password = Column(String(255), nullable=False)
    avatar = Column(String(255), nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())

    workspace_memberships = relationship("WorkspaceMember", back_populates="user")
    assigned_tasks = relationship("Task", back_populates="assignee", foreign_keys="Task.assignee_id")
    comments = relationship("Comment", back_populates="author")


class Workspace(Base):
    __tablename__ = "workspaces"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    owner = relationship("User")
    members = relationship("WorkspaceMember", back_populates="workspace")
    projects = relationship("Project", back_populates="workspace")


class WorkspaceMember(Base):
    __tablename__ = "workspace_members"

    id = Column(Integer, primary_key=True, index=True)
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    role = Column(SQLEnum(WorkspaceRole), default=WorkspaceRole.member)
    joined_at = Column(DateTime, server_default=func.now())

    workspace = relationship("Workspace", back_populates="members")
    user = relationship("User", back_populates="workspace_memberships")


class Project(Base):
    __tablename__ = "projects"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, server_default=func.now())
    workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)

    workspace = relationship("Workspace", back_populates="projects")
    tasks = relationship("Task", back_populates="project")


class Task(Base):
    __tablename__ = "tasks"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False)
    description = Column(Text, nullable=True)
    status = Column(SQLEnum(TaskStatus), default=TaskStatus.todo)
    priority = Column(SQLEnum(TaskPriority), default=TaskPriority.medium)
    due_date = Column(DateTime, nullable=True)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
    creator_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    assignee_id = Column(Integer, ForeignKey("users.id"), nullable=True)

    project = relationship("Project", back_populates="tasks")
    creator = relationship("User", foreign_keys=[creator_id])
    assignee = relationship("User", back_populates="assigned_tasks", foreign_keys=[assignee_id])
    comments = relationship("Comment", back_populates="task")
    attachments = relationship("Attachment", back_populates="task")


class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="comments")
    author = relationship("User", back_populates="comments")


class Attachment(Base):
    __tablename__ = "attachments"

    id = Column(Integer, primary_key=True, index=True)
    filename = Column(String(255), nullable=False)
    original_name = Column(String(255), nullable=False)
    file_size = Column(Integer, nullable=False)
    content_type = Column(String(100), nullable=False)
    uploaded_at = Column(DateTime, server_default=func.now())

    task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
    uploaded_by = Column(Integer, ForeignKey("users.id"), nullable=False)

    task = relationship("Task", back_populates="attachments")
    uploader = relationship("User")

Step 5 — auth_utils.py

# auth_utils.py

from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
from config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    return pwd_context.hash(password)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)


def create_access_token(data: dict) -> str:
    payload = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
    payload.update({"exp": expire})
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")


def verify_token(token: str) -> dict:
    try:
        return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"}
        )

Step 6 — schemas.py

# schemas.py

from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Generic, TypeVar
from models import TaskStatus, TaskPriority, WorkspaceRole

T = TypeVar("T")


# ── Generic Response Wrapper ──────────────────────

class APIResponse(BaseModel, Generic[T]):
    success: bool = True
    message: str = "Success"
    data: T | None = None


class PaginatedResponse(BaseModel, Generic[T]):
    success: bool = True
    data: list[T] = []
    total: int = 0
    page: int = 1
    per_page: int = 10
    total_pages: int = 0
    has_next: bool = False
    has_prev: bool = False


# ── User Schemas ─────────────────────────────────

class UserCreate(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    email: str
    password: str = Field(min_length=8)

    @field_validator("email")
    @classmethod
    def normalize_email(cls, v: str) -> str:
        return v.lower().strip()

    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if v.isdigit():
            raise ValueError("Password cannot be only numbers")
        if v.isalpha():
            raise ValueError("Password must include at least one number")
        return v


class UserUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)


class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    avatar: str | None
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}


class LoginRequest(BaseModel):
    email: str
    password: str


class TokenResponse(BaseModel):
    access_token: str
    token_type: str = "bearer"
    user: UserResponse


# ── Workspace Schemas ─────────────────────────────

class WorkspaceCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class WorkspaceUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None


class WorkspaceMemberResponse(BaseModel):
    id: int
    user: UserResponse
    role: WorkspaceRole
    joined_at: datetime

    model_config = {"from_attributes": True}


class WorkspaceResponse(BaseModel):
    id: int
    name: str
    description: str | None
    owner_id: int
    created_at: datetime
    member_count: int = 0

    model_config = {"from_attributes": True}


class InviteMember(BaseModel):
    email: str
    role: WorkspaceRole = WorkspaceRole.member


# ── Project Schemas ───────────────────────────────

class ProjectCreate(BaseModel):
    name: str = Field(min_length=2, max_length=100)
    description: str | None = Field(default=None, max_length=500)


class ProjectUpdate(BaseModel):
    name: str | None = Field(default=None, min_length=2)
    description: str | None = None
    is_active: bool | None = None


class ProjectResponse(BaseModel):
    id: int
    name: str
    description: str | None
    is_active: bool
    workspace_id: int
    created_at: datetime
    task_count: int = 0

    model_config = {"from_attributes": True}


# ── Task Schemas ──────────────────────────────────

class TaskCreate(BaseModel):
    title: str = Field(min_length=3, max_length=200)
    description: str | None = None
    priority: TaskPriority = TaskPriority.medium
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskUpdate(BaseModel):
    title: str | None = Field(default=None, min_length=3)
    description: str | None = None
    status: TaskStatus | None = None
    priority: TaskPriority | None = None
    due_date: datetime | None = None
    assignee_id: int | None = None


class TaskResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    project_id: int
    creator_id: int
    assignee_id: int | None
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


class TaskDetailResponse(BaseModel):
    id: int
    title: str
    description: str | None
    status: TaskStatus
    priority: TaskPriority
    due_date: datetime | None
    created_at: datetime
    updated_at: datetime
    creator: UserResponse
    assignee: UserResponse | None
    comment_count: int = 0

    model_config = {"from_attributes": True}


# ── Comment Schemas ───────────────────────────────

class CommentCreate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentUpdate(BaseModel):
    content: str = Field(min_length=1, max_length=2000)


class CommentResponse(BaseModel):
    id: int
    content: str
    task_id: int
    author: UserResponse
    created_at: datetime
    updated_at: datetime

    model_config = {"from_attributes": True}


# ── Attachment Schema ─────────────────────────────

class AttachmentResponse(BaseModel):
    id: int
    filename: str
    original_name: str
    file_size: int
    content_type: str
    uploaded_at: datetime
    task_id: int

    model_config = {"from_attributes": True}

Step 7 — utils.py

# utils.py

from sqlalchemy.orm import Session


def paginate(query, page: int = 1, per_page: int = 10) -> dict:
    """Apply pagination to any SQLAlchemy query."""
    total = query.count()
    total_pages = (total + per_page - 1) // per_page
    items = query.offset((page - 1) * per_page).limit(per_page).all()

    return {
        "data": items,
        "total": total,
        "page": page,
        "per_page": per_page,
        "total_pages": total_pages,
        "has_next": page < total_pages,
        "has_prev": page > 1
    }


def send_notification_email(to_email: str, subject: str, body: str):
    """Simulate sending email — replace with real email library in production."""
    print(f"\nšŸ“§ EMAIL TO: {to_email}")
    print(f"   SUBJECT: {subject}")
    print(f"   BODY: {body}\n")

Step 8 — dependencies.py

# dependencies.py

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
import auth_utils
import models

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> models.User:
    payload = auth_utils.verify_token(token)
    user_id = payload.get("user_id")

    if not user_id:
        raise HTTPException(status_code=401, detail="Invalid token")

    user = db.query(models.User).filter(models.User.id == user_id).first()

    if not user:
        raise HTTPException(status_code=401, detail="User not found")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    return user


def get_workspace_member(
    workspace_id: int,
    current_user: models.User = Depends(get_current_user),
    db: Session = Depends(get_db)
) -> models.WorkspaceMember:
    """Verify user is a member of the workspace."""
    membership = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == current_user.id
    ).first()

    if not membership:
        raise HTTPException(
            status_code=403,
            detail="You are not a member of this workspace"
        )

    return membership


def get_workspace_admin(
    membership: models.WorkspaceMember = Depends(get_workspace_member)
) -> models.WorkspaceMember:
    """Verify user is admin or owner of the workspace."""
    if membership.role not in [models.WorkspaceRole.owner, models.WorkspaceRole.admin]:
        raise HTTPException(
            status_code=403,
            detail="Admin access required"
        )
    return membership

Step 9 — All Routers

routers/auth.py

# routers/auth.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import models
import schemas
import auth_utils
from database import get_db
from dependencies import get_current_user

router = APIRouter(prefix="/auth", tags=["Authentication"])


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
    if db.query(models.User).filter(models.User.email == user_data.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")

    hashed = auth_utils.hash_password(user_data.password)
    user = models.User(
        name=user_data.name,
        email=user_data.email,
        password=hashed
    )
    db.add(user)
    db.commit()
    db.refresh(user)
    return user


@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(
        models.User.email == credentials.email.lower()
    ).first()

    if not user or not auth_utils.verify_password(credentials.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid email or password")

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Account deactivated")

    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer", "user": user}


@router.post("/token")    # for /docs Authorize button
def login_for_docs(
    form_data: schemas.LoginRequest,
    db: Session = Depends(get_db)
):
    user = db.query(models.User).filter(
        models.User.email == form_data.email.lower()
    ).first()
    if not user or not auth_utils.verify_password(form_data.password, user.password):
        raise HTTPException(status_code=401, detail="Invalid credentials")
    token = auth_utils.create_access_token({"user_id": user.id})
    return {"access_token": token, "token_type": "bearer"}


@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user: models.User = Depends(get_current_user)):
    return current_user


@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
    update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    if update.name:
        current_user.name = update.name
    db.commit()
    db.refresh(current_user)
    return current_user

routers/workspaces.py

# routers/workspaces.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate, send_notification_email

router = APIRouter(prefix="/workspaces", tags=["Workspaces"])


@router.post("", response_model=schemas.WorkspaceResponse, status_code=201)
def create_workspace(
    data: schemas.WorkspaceCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    workspace = models.Workspace(
        name=data.name,
        description=data.description,
        owner_id=current_user.id
    )
    db.add(workspace)
    db.flush()    # get ID before commit

    # Add creator as owner member
    member = models.WorkspaceMember(
        workspace_id=workspace.id,
        user_id=current_user.id,
        role=models.WorkspaceRole.owner
    )
    db.add(member)
    db.commit()
    db.refresh(workspace)

    workspace.member_count = 1
    return workspace


@router.get("", response_model=schemas.PaginatedResponse[schemas.WorkspaceResponse])
def get_my_workspaces(
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Workspace).join(
        models.WorkspaceMember
    ).filter(
        models.WorkspaceMember.user_id == current_user.id
    )

    result = paginate(query, page, per_page)

    for ws in result["data"]:
        ws.member_count = db.query(models.WorkspaceMember).filter(
            models.WorkspaceMember.workspace_id == ws.id
        ).count()

    return result


@router.get("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def get_workspace(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    workspace.member_count = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).count()

    return workspace


@router.patch("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def update_workspace(
    workspace_id: int,
    data: schemas.WorkspaceUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    if data.name:
        workspace.name = data.name
    if data.description is not None:
        workspace.description = data.description

    db.commit()
    db.refresh(workspace)
    workspace.member_count = len(workspace.members)
    return workspace


@router.post("/{workspace_id}/invite", response_model=schemas.WorkspaceMemberResponse)
def invite_member(
    workspace_id: int,
    invite: schemas.InviteMember,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    # Find user by email
    user = db.query(models.User).filter(
        models.User.email == invite.email.lower()
    ).first()

    if not user:
        raise HTTPException(status_code=404, detail="User with this email not found")

    # Check already member
    existing = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user.id
    ).first()

    if existing:
        raise HTTPException(status_code=400, detail="User is already a member")

    # Add member
    new_member = models.WorkspaceMember(
        workspace_id=workspace_id,
        user_id=user.id,
        role=invite.role
    )
    db.add(new_member)
    db.commit()
    db.refresh(new_member)

    # Send notification in background
    workspace = db.query(models.Workspace).filter(
        models.Workspace.id == workspace_id
    ).first()

    background_tasks.add_task(
        send_notification_email,
        user.email,
        f"You've been added to {workspace.name}",
        f"Hi {user.name}, you've been added as {invite.role} to workspace '{workspace.name}'"
    )

    return new_member


@router.get("/{workspace_id}/members", response_model=list[schemas.WorkspaceMemberResponse])
def get_members(
    workspace_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    return db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id
    ).all()


@router.delete("/{workspace_id}/members/{user_id}", status_code=204)
def remove_member(
    workspace_id: int,
    user_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    member = db.query(models.WorkspaceMember).filter(
        models.WorkspaceMember.workspace_id == workspace_id,
        models.WorkspaceMember.user_id == user_id
    ).first()

    if not member:
        raise HTTPException(status_code=404, detail="Member not found")

    if member.role == models.WorkspaceRole.owner:
        raise HTTPException(status_code=400, detail="Cannot remove workspace owner")

    db.delete(member)
    db.commit()

routers/projects.py

# routers/projects.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate

router = APIRouter(prefix="/workspaces/{workspace_id}/projects", tags=["Projects"])


@router.post("", response_model=schemas.ProjectResponse, status_code=201)
def create_project(
    workspace_id: int,
    data: schemas.ProjectCreate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = models.Project(
        name=data.name,
        description=data.description,
        workspace_id=workspace_id
    )
    db.add(project)
    db.commit()
    db.refresh(project)
    project.task_count = 0
    return project


@router.get("", response_model=schemas.PaginatedResponse[schemas.ProjectResponse])
def get_projects(
    workspace_id: int,
    page: int = 1,
    per_page: int = 10,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    query = db.query(models.Project).filter(
        models.Project.workspace_id == workspace_id
    )
    result = paginate(query, page, per_page)

    for project in result["data"]:
        project.task_count = db.query(models.Task).filter(
            models.Task.project_id == project.id
        ).count()

    return result


@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_member)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    project.task_count = db.query(models.Task).filter(
        models.Task.project_id == project_id
    ).count()

    return project


@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
    workspace_id: int,
    project_id: int,
    data: schemas.ProjectUpdate,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(project, field, value)

    db.commit()
    db.refresh(project)
    project.task_count = len(project.tasks)
    return project


@router.delete("/{project_id}", status_code=204)
def delete_project(
    workspace_id: int,
    project_id: int,
    db: Session = Depends(get_db),
    membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
    project = db.query(models.Project).filter(
        models.Project.id == project_id,
        models.Project.workspace_id == workspace_id
    ).first()

    if not project:
        raise HTTPException(status_code=404, detail="Project not found")

    db.delete(project)
    db.commit()

routers/tasks.py

# routers/tasks.py

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, File, UploadFile
from sqlalchemy.orm import Session
from sqlalchemy import or_
import os
import uuid
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member
from utils import paginate, send_notification_email
from models import TaskStatus, TaskPriority

router = APIRouter(prefix="/projects/{project_id}/tasks", tags=["Tasks"])

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)


def get_project_or_404(project_id: int, db: Session) -> models.Project:
    project = db.query(models.Project).filter(models.Project.id == project_id).first()
    if not project:
        raise HTTPException(status_code=404, detail="Project not found")
    return project


@router.post("", response_model=schemas.TaskResponse, status_code=201)
def create_task(
    project_id: int,
    data: schemas.TaskCreate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    project = get_project_or_404(project_id, db)

    task = models.Task(
        title=data.title,
        description=data.description,
        priority=data.priority,
        due_date=data.due_date,
        assignee_id=data.assignee_id,
        project_id=project_id,
        creator_id=current_user.id
    )
    db.add(task)
    db.commit()
    db.refresh(task)

    # Notify assignee in background
    if data.assignee_id and data.assignee_id != current_user.id:
        assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if assignee:
            background_tasks.add_task(
                send_notification_email,
                assignee.email,
                f"New task assigned: {task.title}",
                f"Hi {assignee.name}, you have been assigned task '{task.title}' in project '{project.name}'"
            )

    return task


@router.get("", response_model=schemas.PaginatedResponse[schemas.TaskResponse])
def get_tasks(
    project_id: int,
    page: int = 1,
    per_page: int = 10,
    status: TaskStatus | None = None,
    priority: TaskPriority | None = None,
    assignee_id: int | None = None,
    search: str | None = None,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    get_project_or_404(project_id, db)

    query = db.query(models.Task).filter(models.Task.project_id == project_id)

    if status:
        query = query.filter(models.Task.status == status)
    if priority:
        query = query.filter(models.Task.priority == priority)
    if assignee_id:
        query = query.filter(models.Task.assignee_id == assignee_id)
    if search:
        query = query.filter(
            or_(
                models.Task.title.ilike(f"%{search}%"),
                models.Task.description.ilike(f"%{search}%")
            )
        )

    return paginate(query, page, per_page)


@router.get("/{task_id}", response_model=schemas.TaskDetailResponse)
def get_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    task.comment_count = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).count()

    return task


@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
    project_id: int,
    task_id: int,
    data: schemas.TaskUpdate,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    old_assignee_id = task.assignee_id
    update_data = data.model_dump(exclude_none=True)
    for field, value in update_data.items():
        setattr(task, field, value)

    db.commit()
    db.refresh(task)

    # Notify new assignee if changed
    if data.assignee_id and data.assignee_id != old_assignee_id:
        new_assignee = db.query(models.User).filter(
            models.User.id == data.assignee_id
        ).first()
        if new_assignee:
            background_tasks.add_task(
                send_notification_email,
                new_assignee.email,
                f"Task reassigned to you: {task.title}",
                f"Hi {new_assignee.name}, task '{task.title}' has been assigned to you."
            )

    return task


@router.delete("/{task_id}", status_code=204)
def delete_task(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if task.creator_id != current_user.id:
        raise HTTPException(status_code=403, detail="Only task creator can delete")

    db.delete(task)
    db.commit()


@router.post("/{task_id}/attachments", response_model=schemas.AttachmentResponse)
async def upload_attachment(
    project_id: int,
    task_id: int,
    file: UploadFile = File(...),
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(
        models.Task.id == task_id,
        models.Task.project_id == project_id
    ).first()

    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    content = await file.read()

    if len(content) > 10 * 1024 * 1024:
        raise HTTPException(status_code=400, detail="File too large. Max 10MB")

    extension = file.filename.split(".")[-1].lower()
    unique_name = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    with open(file_path, "wb") as f:
        f.write(content)

    attachment = models.Attachment(
        filename=unique_name,
        original_name=file.filename,
        file_size=len(content),
        content_type=file.content_type,
        task_id=task_id,
        uploaded_by=current_user.id
    )
    db.add(attachment)
    db.commit()
    db.refresh(attachment)

    return attachment


@router.get("/{task_id}/attachments", response_model=list[schemas.AttachmentResponse])
def get_attachments(
    project_id: int,
    task_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    return db.query(models.Attachment).filter(
        models.Attachment.task_id == task_id
    ).all()

routers/comments.py

# routers/comments.py

from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user
from utils import paginate

router = APIRouter(prefix="/tasks/{task_id}/comments", tags=["Comments"])


@router.post("", response_model=schemas.CommentResponse, status_code=201)
def create_comment(
    task_id: int,
    data: schemas.CommentCreate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    task = db.query(models.Task).filter(models.Task.id == task_id).first()
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    comment = models.Comment(
        content=data.content,
        task_id=task_id,
        author_id=current_user.id
    )
    db.add(comment)
    db.commit()
    db.refresh(comment)
    return comment


@router.get("", response_model=schemas.PaginatedResponse[schemas.CommentResponse])
def get_comments(
    task_id: int,
    page: int = 1,
    per_page: int = 20,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    query = db.query(models.Comment).filter(
        models.Comment.task_id == task_id
    ).order_by(models.Comment.created_at.asc())

    return paginate(query, page, per_page)


@router.patch("/{comment_id}", response_model=schemas.CommentResponse)
def update_comment(
    task_id: int,
    comment_id: int,
    data: schemas.CommentUpdate,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only edit your own comments")

    comment.content = data.content
    db.commit()
    db.refresh(comment)
    return comment


@router.delete("/{comment_id}", status_code=204)
def delete_comment(
    task_id: int,
    comment_id: int,
    db: Session = Depends(get_db),
    current_user: models.User = Depends(get_current_user)
):
    comment = db.query(models.Comment).filter(
        models.Comment.id == comment_id,
        models.Comment.task_id == task_id
    ).first()

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    if comment.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Can only delete your own comments")

    db.delete(comment)
    db.commit()

Step 10 — main.py — Final Clean Entry Point

# main.py

import time
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import models
from database import engine
from routers import auth, users, workspaces, projects, tasks, comments
import os

# Create tables
models.Base.metadata.create_all(bind=engine)

# Create uploads folder
os.makedirs("uploads", exist_ok=True)

app = FastAPI(
    title="Task Manager API",
    description="""
    A complete task management API built with FastAPI.

    ## Features
    - JWT Authentication
    - Workspaces with role-based access
    - Projects inside workspaces
    - Tasks with priorities and statuses
    - Comments on tasks
    - File attachments
    - Background notifications
    """,
    version="1.0.0",
    contact={
        "name": "Gagan",
        "email": "gagan@email.com"
    }
)

# ── CORS ──────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Request Logging Middleware ────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    print(f"[{request.method}] {request.url.path} → {response.status_code} ({duration}ms)")
    return response

# ── Exception Handlers ────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
    errors = [
        {
            "field": " → ".join(str(l) for l in e["loc"]),
            "message": e["msg"]
        }
        for e in exc.errors()
    ]
    return JSONResponse(
        status_code=422,
        content={"success": False, "message": "Validation failed", "errors": errors}
    )

@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse(
        status_code=404,
        content={"success": False, "message": f"Route not found: {request.url.path}"}
    )

# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")

# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(workspaces.router)
app.include_router(projects.router)
app.include_router(tasks.router)
app.include_router(comments.router)

# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
    return {
        "message": "Task Manager API",
        "version": "1.0.0",
        "docs": "/docs",
        "redoc": "/redoc"
    }

Running the Final Project

uvicorn main:app --reload

Open http://localhost:8000/docs — you'll see the complete API organized into 6 sections:

  • Authentication
  • Users
  • Workspaces
  • Projects
  • Tasks
  • Comments

Complete Testing Flow

Test this complete scenario in /docs:

1. Register two users:

  • User A: gagan@email.com
  • User B: rahul@email.com

2. Login as User A → get token → Authorize in /docs

3. Create a workspace:

{"name": "My Team", "description": "Our workspace"}

4. Invite User B to workspace:

{"email": "rahul@email.com", "role": "member"}

Watch terminal — notification email prints in background.

5. Create a project:

{"name": "Website Redesign", "description": "Q1 project"}

6. Create a task:

{
    "title": "Design homepage mockup",
    "description": "Create Figma designs for the new homepage",
    "priority": "high",
    "assignee_id": 2
}

Watch terminal — assignment notification prints.

7. Login as User B → add a comment on the task

8. Upload an attachment to the task

9. Update task status to in_progress

10. Mark task as done


What You've Built

A complete production-grade REST API with:

✅ Clean project structure — routers, models, schemas separated
✅ JWT authentication — register, login, protected routes
✅ Role-based access — owner, admin, member
✅ Full CRUD — workspaces, projects, tasks, comments
✅ Relationships — nested resources, foreign keys
✅ Pagination — on all list endpoints
✅ Search and filtering — tasks by status, priority, assignee
✅ File uploads — attachments on tasks
✅ Background tasks — email notifications
✅ CORS — ready for frontend connection
✅ Middleware — request logging
✅ Custom exception handlers — consistent errors
✅ Auto documentation — /docs and /redoc

Connecting to Your Next.js Frontend

Since you already build Next.js apps — connecting is straightforward:

// lib/api.js in your Next.js project

const API_URL = "http://localhost:8000"

export async function login(email, password) {
    const res = await fetch(`${API_URL}/auth/login`, {
        method: "POST",
        headers: {"Content-Type": "application/json"},
        body: JSON.stringify({email, password})
    })
    return res.json()    // returns {access_token, user}
}

export async function getTasks(projectId, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        headers: {"Authorization": `Bearer ${token}`}
    })
    return res.json()
}

export async function createTask(projectId, taskData, token) {
    const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
        method: "POST",
        headers: {
            "Content-Type": "application/json",
            "Authorization": `Bearer ${token}`
        },
        body: JSON.stringify(taskData)
    })
    return res.json()
}

Same Bearer token pattern your NestJS frontend already uses — zero learning curve.


What's Next

You now know FastAPI end to end. Depending on where you want to go:

Immediate next steps for this project:

  • Switch SQLite to PostgreSQL — one line change in .env
  • Deploy to a VPS with Nginx + Uvicorn
  • Add email sending with fastapi-mail
  • Add WebSockets for real-time task updates
  • Write tests with pytest

Deeper FastAPI topics:

  • Alembic migrations — managing database schema changes
  • WebSockets — real-time features
  • Celery + Redis — heavy background job processing
  • pytest — testing your API endpoints
  • Docker — containerizing your app

Given your NestJS and PostgreSQL background — switching to PostgreSQL and deploying this to your existing Azure infrastructure would be the natural next step. The code is already ready for it.


Advanced Features in Fast APIs

What We're Covering

This stage covers the features that take your API from "works locally" to "production ready":

  • CORS — so your frontend can talk to your API
  • Middleware — code that runs on every request
  • Background Tasks — run tasks after sending response
  • File Uploads — handle images and documents
  • APIRouter — organize routes into separate files
  • Custom Exception Handlers — consistent error responses

Part 1 — CORS

What is CORS?

CORS = Cross-Origin Resource Sharing

When your Next.js frontend (running on localhost:3000) tries to call your FastAPI backend (running on localhost:8000) — the browser blocks it by default. Different ports = different origins = CORS error.

You've definitely seen this error before in your NestJS projects:

Access to fetch at 'http://localhost:8000' from origin 'http://localhost:3000' 
has been blocked by CORS policy

FastAPI fixes this with one middleware setup:

# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost:3000",      # Next.js dev server
        "http://localhost:3001",
        "https://yourfrontend.com",   # production frontend
    ],
    allow_credentials=True,           # allows cookies and auth headers
    allow_methods=["*"],              # allows GET, POST, PUT, DELETE etc
    allow_headers=["*"],              # allows Authorization, Content-Type etc
)

For development only — allow everything:

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],      # never use this in production
    allow_methods=["*"],
    allow_headers=["*"],
)

Always restrict allow_origins in production — only list domains that should access your API.


Part 2 — Middleware

What is Middleware?

Middleware is code that runs on every single request before it reaches your route, and on every response before it goes back to the client.

Think of it like a checkpoint — every request passes through it.

Request → Middleware → Route Handler → Middleware → Response

You use middleware for things that apply to all routes:

  • Logging every request
  • Measuring response time
  • Adding headers to every response
  • Checking API keys

Creating Custom Middleware

# main.py
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = time.time()

    # Code here runs BEFORE the route handler
    print(f"→ {request.method} {request.url}")

    response = await call_next(request)    # call the actual route

    # Code here runs AFTER the route handler
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(round(process_time * 1000, 2)) + "ms"

    print(f"← {response.status_code} ({round(process_time * 1000, 2)}ms)")

    return response

Now every request logs like this in your terminal:

→ GET http://localhost:8000/users
← 200 (3.42ms)

→ POST http://localhost:8000/auth/login
← 200 (145.23ms)

And every response has the processing time in its headers — visible in browser dev tools.


Multiple Middlewares

You can stack multiple middlewares. They run in order:

@app.middleware("http")
async def log_requests(request: Request, call_next):
    print(f"Request: {request.method} {request.url.path}")
    response = await call_next(request)
    return response


@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    return response

Part 3 — Background Tasks

What are Background Tasks?

Sometimes after handling a request you need to do extra work that the client doesn't need to wait for — like sending an email, logging to a file, processing data.

Without background tasks:

Client waits... API sends email (3 seconds)... API returns response
Total wait: 3+ seconds

With background tasks:

Client waits... API returns response immediately
Email sends in background (client already got response)
Total wait: milliseconds

Basic Background Task

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()


def send_welcome_email(email: str, name: str):
    """This runs in background after response is sent."""
    import time
    time.sleep(2)    # simulate email sending delay
    print(f"Email sent to {email}: Welcome {name}!")


@app.post("/users/register")
def register_user(
    name: str,
    email: str,
    background_tasks: BackgroundTasks
):
    # Add task to run after response
    background_tasks.add_task(send_welcome_email, email, name)

    # This response goes back immediately
    # Email sends after this
    return {"message": "Registered successfully! Check your email."}

Client gets response instantly. Email sends 2 seconds later in background. Client never waits.


Background Task with Database

A realistic example — log every login attempt:

from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from datetime import datetime
from database import get_db


def log_login_attempt(email: str, success: bool, ip: str):
    """Log to file in background."""
    status = "SUCCESS" if success else "FAILED"
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] LOGIN {status} — Email: {email} — IP: {ip}\n"

    with open("login_log.txt", "a") as f:
        f.write(log_entry)

    print(log_entry.strip())


@app.post("/auth/login")
def login(
    credentials: schemas.LoginRequest,
    request: Request,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db)
):
    user = crud.authenticate_user(db, credentials.email, credentials.password)
    client_ip = request.client.host

    if not user:
        background_tasks.add_task(log_login_attempt, credentials.email, False, client_ip)
        raise HTTPException(status_code=401, detail="Invalid credentials")

    token = auth.create_access_token(data={"user_id": user.id})
    background_tasks.add_task(log_login_attempt, credentials.email, True, client_ip)

    return {"access_token": token, "token_type": "bearer", "user": user}

Multiple Background Tasks

You can add multiple tasks — they all run after response:

@app.post("/orders")
def create_order(
    order: OrderCreate,
    background_tasks: BackgroundTasks,
    current_user = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    new_order = crud.create_order(db, order, current_user.id)

    background_tasks.add_task(send_order_confirmation_email, current_user.email, new_order)
    background_tasks.add_task(notify_warehouse, new_order.id)
    background_tasks.add_task(update_inventory, order.product_id, order.quantity)

    return new_order    # returns immediately, all 3 tasks run after

Part 4 — File Uploads

Handling File Uploads

FastAPI handles file uploads cleanly. You need python-multipart which you already installed.

from fastapi import FastAPI, File, UploadFile

app = FastAPI()


@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    return {
        "filename": file.filename,
        "content_type": file.content_type,
        "size": file.size
    }

UploadFile gives you:

  • file.filename — original filename
  • file.content_type — mime type (image/jpeg, application/pdf etc)
  • file.size — file size in bytes
  • await file.read() — file content as bytes

Saving Uploaded File

import os
import uuid
from fastapi import FastAPI, File, UploadFile, HTTPException

app = FastAPI()

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)    # create folder if not exists

ALLOWED_TYPES = ["image/jpeg", "image/png", "image/webp"]
MAX_SIZE = 5 * 1024 * 1024    # 5MB in bytes


@app.post("/upload/image")
async def upload_image(file: UploadFile = File(...)):
    # Validate file type
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(
            status_code=400,
            detail=f"File type not allowed. Allowed: {ALLOWED_TYPES}"
        )

    # Read file content
    content = await file.read()

    # Validate file size
    if len(content) > MAX_SIZE:
        raise HTTPException(
            status_code=400,
            detail="File too large. Maximum size is 5MB"
        )

    # Generate unique filename to avoid conflicts
    extension = file.filename.split(".")[-1]
    unique_filename = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_filename)

    # Save file
    with open(file_path, "wb") as f:
        f.write(content)

    return {
        "message": "File uploaded successfully",
        "filename": unique_filename,
        "original_name": file.filename,
        "size": len(content),
        "url": f"/files/{unique_filename}"
    }

Serving Uploaded Files

After saving, you want to serve them via URL:

from fastapi.staticfiles import StaticFiles

# Mount uploads folder as static files
# Now http://localhost:8000/files/image.jpg serves the file
app.mount("/files", StaticFiles(directory="uploads"), name="files")

Upload with Form Data and Fields Together

Sometimes you want to upload a file AND send extra data together:

from fastapi import Form

@app.post("/upload/profile-picture")
async def upload_profile_picture(
    file: UploadFile = File(...),
    user_id: int = Form(...),
    description: str = Form(default="")
):
    # Can't use JSON body with file upload — must use Form fields
    content = await file.read()

    return {
        "user_id": user_id,
        "description": description,
        "filename": file.filename,
        "size": len(content)
    }

Important — when uploading files you cannot use JSON body for other fields. You must use Form() for text fields alongside File().


Multiple File Upload

@app.post("/upload/multiple")
async def upload_multiple(files: list[UploadFile] = File(...)):
    results = []

    for file in files:
        content = await file.read()
        extension = file.filename.split(".")[-1]
        unique_name = f"{uuid.uuid4()}.{extension}"
        file_path = os.path.join(UPLOAD_DIR, unique_name)

        with open(file_path, "wb") as f:
            f.write(content)

        results.append({
            "original": file.filename,
            "saved_as": unique_name,
            "size": len(content)
        })

    return {"uploaded": len(results), "files": results}

Part 5 — APIRouter — Organizing Routes

The Problem

As your app grows, main.py becomes thousands of lines with routes for users, posts, products, auth, orders — everything mixed together. Nightmare to maintain.

APIRouter lets you split routes into separate files — exactly like Controllers in NestJS.


Creating Routers

Create a routers/ folder:

fastapi-learning/
├── routers/
│   ├── auth.py
│   ├── users.py
│   ├── posts.py
│   └── uploads.py
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py
└── dependencies.py

routers/auth.py

# routers/auth.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
import auth as auth_utils
from database import get_db

router = APIRouter(
    prefix="/auth",           # all routes start with /auth
    tags=["Authentication"]   # groups in /docs
)


@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
    existing = crud.get_user_by_email(db, user.email)
    if existing:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db, user)


@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    user = crud.authenticate_user(db, credentials.email, credentials.password)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid email or password")

    token = auth_utils.create_access_token(data={"user_id": user.id})
    return {"access_token": token, "token_type": "bearer", "user": user}

routers/users.py

# routers/users.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
from database import get_db
from dependencies import get_current_user

router = APIRouter(
    prefix="/users",
    tags=["Users"]
)


@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user = Depends(get_current_user)):
    return current_user


@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
    user_update: schemas.UserUpdate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.update_user(db, current_user.id, user_update)


@router.delete("/me", status_code=status.HTTP_204_NO_CONTENT)
def delete_me(
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    crud.delete_user(db, current_user.id)


@router.get("", response_model=list[schemas.UserResponse])
def get_users(
    skip: int = 0,
    limit: int = 10,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.get_users(db, skip, limit)


@router.get("/{user_id}", response_model=schemas.UserResponse)
def get_user(
    user_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    user = crud.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

routers/posts.py

# routers/posts.py

from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import schemas
import crud
from database import get_db
from dependencies import get_current_user

router = APIRouter(
    prefix="/posts",
    tags=["Posts"]
)


@router.post("", response_model=schemas.PostResponse, status_code=201)
def create_post(
    post: schemas.PostCreate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.create_post(db, post, author_id=current_user.id)


@router.get("", response_model=list[schemas.PostResponse])
def get_posts(
    skip: int = 0,
    limit: int = 10,
    published: bool | None = None,
    db: Session = Depends(get_db)
):
    return crud.get_posts(db, skip, limit, published)


@router.get("/my", response_model=list[schemas.PostResponse])
def get_my_posts(
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    return crud.get_posts(db, author_id=current_user.id)


@router.get("/{post_id}", response_model=schemas.PostWithAuthor)
def get_post(post_id: int, db: Session = Depends(get_db)):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    return post


@router.patch("/{post_id}", response_model=schemas.PostResponse)
def update_post(
    post_id: int,
    post_update: schemas.PostUpdate,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")
    return crud.update_post(db, post_id, post_update)


@router.delete("/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_post(
    post_id: int,
    db: Session = Depends(get_db),
    current_user = Depends(get_current_user)
):
    post = crud.get_post(db, post_id)
    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    if post.author_id != current_user.id:
        raise HTTPException(status_code=403, detail="Not authorized")
    crud.delete_post(db, post_id)

routers/uploads.py

# routers/uploads.py

import os
import uuid
from fastapi import APIRouter, File, UploadFile, HTTPException, Depends
from fastapi.responses import FileResponse
from dependencies import get_current_user

router = APIRouter(
    prefix="/uploads",
    tags=["Uploads"]
)

UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
ALLOWED_TYPES = ["image/jpeg", "image/png", "image/webp", "application/pdf"]
MAX_SIZE = 5 * 1024 * 1024


@router.post("/image")
async def upload_image(
    file: UploadFile = File(...),
    current_user = Depends(get_current_user)
):
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(status_code=400, detail="File type not allowed")

    content = await file.read()

    if len(content) > MAX_SIZE:
        raise HTTPException(status_code=400, detail="File too large. Max 5MB")

    extension = file.filename.split(".")[-1].lower()
    unique_name = f"{uuid.uuid4()}.{extension}"
    file_path = os.path.join(UPLOAD_DIR, unique_name)

    with open(file_path, "wb") as f:
        f.write(content)

    return {
        "message": "Uploaded successfully",
        "filename": unique_name,
        "url": f"/files/{unique_name}",
        "uploaded_by": current_user.id
    }

Clean main.py — Register All Routers

Now main.py is beautifully clean:

# main.py

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import time
import models
from database import engine
from routers import auth, users, posts, uploads

# Create database tables
models.Base.metadata.create_all(bind=engine)

app = FastAPI(
    title="Production Ready API",
    description="FastAPI with Auth, Database, File Uploads",
    version="5.0.0"
)

# ── CORS ──────────────────────────────────────────
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Middleware ────────────────────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
    print(f"{request.method} {request.url.path} → {response.status_code} ({duration}ms)")
    return response

# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")

# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(uploads.router)

# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
    return {"message": "API is running", "docs": "/docs"}

Open /docs — routes are now organized into groups: Authentication, Users, Posts, Uploads. Much cleaner.


Part 6 — Custom Exception Handlers

Consistent Error Responses

Right now different errors return different formats. Let's make all errors consistent:

# main.py — add these handlers

from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from sqlalchemy.exc import SQLAlchemyError


# Handle validation errors (422)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    errors = []
    for error in exc.errors():
        errors.append({
            "field": " → ".join(str(loc) for loc in error["loc"]),
            "message": error["msg"],
            "type": error["type"]
        })

    return JSONResponse(
        status_code=422,
        content={
            "success": False,
            "message": "Validation failed",
            "errors": errors
        }
    )


# Handle database errors (500)
@app.exception_handler(SQLAlchemyError)
async def database_exception_handler(request: Request, exc: SQLAlchemyError):
    return JSONResponse(
        status_code=500,
        content={
            "success": False,
            "message": "Database error occurred",
            "detail": str(exc)
        }
    )


# Handle 404 not found
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
    return JSONResponse(
        status_code=404,
        content={
            "success": False,
            "message": f"Route {request.url.path} not found"
        }
    )

Now all errors have the same shape — frontend can handle them consistently.


Standard Response Wrapper — Optional but Professional

Some teams wrap all responses in a standard format:

# schemas.py — add this

from typing import Generic, TypeVar
from pydantic import BaseModel

T = TypeVar("T")

class APIResponse(BaseModel, Generic[T]):
    success: bool = True
    message: str = "Success"
    data: T | None = None

Use it in routes:

@router.get("/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = crud.get_user(db, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return APIResponse(data=user, message="User fetched successfully")

Response:

{
    "success": true,
    "message": "User fetched successfully",
    "data": {
        "id": 1,
        "name": "Gagan",
        ...
    }
}

This is a common pattern in production APIs — especially when building for mobile apps.


Final Project Structure

fastapi-learning/
├── routers/
│   ├── __init__.py
│   ├── auth.py
│   ├── users.py
│   ├── posts.py
│   └── uploads.py
├── uploads/               ← uploaded files stored here
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py
├── dependencies.py
├── config.py
└── requirements.txt

Save Requirements

pip freeze > requirements.txt

Your requirements.txt will look like:

fastapi==0.115.0
uvicorn==0.30.0
sqlalchemy==2.0.35
passlib[bcrypt]==1.7.4
python-jose[cryptography]==3.3.0
python-multipart==0.0.12
python-dotenv==1.0.1
pydantic-settings==2.5.2

Exercise šŸ‹️

Add these features to complete the project:

1. Profile Picture Upload:

  • POST /users/me/avatar — upload profile picture (protected)
  • Save filename to user's database record
  • Add avatar_url field to UserResponse
  • Add avatar column to User model

2. Post Cover Image:

  • PATCH /posts/{id}/cover — upload cover image for a post
  • Only post author can upload cover
  • Add cover_image field to Post model and response

3. Request Logging Middleware:

[2025-03-03 10:30:01] POST /auth/login 200 145ms
[2025-03-03 10:30:05] GET /users/me 200 3ms
[2025-03-03 10:30:10] POST /posts 201 12ms

Save logs to requests.log file using background tasks.

4. Pagination Response: Instead of returning plain list, return:

{
    "data": [...],
    "total": 50,
    "page": 1,
    "per_page": 10,
    "total_pages": 5,
    "has_next": true,
    "has_prev": false
}

Create a reusable paginate() helper function in a new utils.py file.


Authentication & Security in Fast APIs

What We're Building

By the end of this stage your API will have:

  • User registration with hashed passwords
  • Login that returns a JWT token
  • Protected routes that require a valid token
  • Current user detection from token

This is exactly how real production APIs work — same pattern used by every major application.


How JWT Authentication Works

Since you build with NestJS you know this flow. Quick recap in FastAPI context:

1. User registers → password gets hashed → stored in database
2. User logs in → password verified → JWT token generated → sent to client
3. Client sends token in every request header
4. Server verifies token → extracts user info → allows or denies access

Installing Dependencies

pip install python-jose[cryptography] passlib[bcrypt] python-multipart
  • python-jose — creates and verifies JWT tokens
  • passlib[bcrypt] — hashes passwords with bcrypt
  • python-multipart — needed for OAuth2 form login

Updated Project Structure

fastapi-learning/
├── venv/
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth.py            ← NEW: all auth logic lives here
├── dependencies.py    ← NEW: reusable dependencies
└── requirements.txt

Step 1 — Update models.py

Add password field to User:


    # models.py

    from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, ForeignKey
    from sqlalchemy.orm import relationship
    from sqlalchemy.sql import func
    from database import Base


    class User(Base):
        __tablename__ = "users"

        id = Column(Integer, primary_key=True, index=True)
        name = Column(String(50), nullable=False)
        email = Column(String(100), unique=True, nullable=False, index=True)
        password = Column(String(255), nullable=False)      # hashed password stored here
        age = Column(Integer, nullable=False)
        city = Column(String(50), default="Unknown")
        is_active = Column(Boolean, default=True)
        created_at = Column(DateTime, server_default=func.now())

        posts = relationship("Post", back_populates="author")


    class Post(Base):
        __tablename__ = "posts"

        id = Column(Integer, primary_key=True, index=True)
        title = Column(String(100), nullable=False)
        content = Column(Text, nullable=False)
        published = Column(Boolean, default=False)
        created_at = Column(DateTime, server_default=func.now())
        author_id = Column(Integer, ForeignKey("users.id"), nullable=False)

        author = relationship("User", back_populates="posts")


Step 2 — auth.py — All Auth Logic


    # auth.py

    from datetime import datetime, timedelta
    from jose import JWTError, jwt
    from passlib.context import CryptContext
    from fastapi import HTTPException, status

    # ── Configuration ─────────────────────────────────
    SECRET_KEY = "your-super-secret-key-change-this-in-production"
    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30

    # ── Password Hashing ──────────────────────────────
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


    def hash_password(password: str) -> str:
        """Convert plain password to bcrypt hash."""
        return pwd_context.hash(password)


    def verify_password(plain_password: str, hashed_password: str) -> bool:
        """Check if plain password matches the stored hash."""
        return pwd_context.verify(plain_password, hashed_password)


    # ── JWT Token ─────────────────────────────────────
    def create_access_token(data: dict) -> str:
        """Create a JWT token with expiry."""
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


    def verify_token(token: str) -> dict:
        """Verify JWT token and return its payload."""
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            return payload
        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid or expired token",
                headers={"WWW-Authenticate": "Bearer"}
            )


Step 3 — dependencies.py — Get Current User


    # dependencies.py

    from fastapi import Depends, HTTPException, status
    from fastapi.security import OAuth2PasswordBearer
    from sqlalchemy.orm import Session
    from database import get_db
    import auth
    import crud

    # This tells FastAPI where the login endpoint is
    # Adds a lock icon on protected routes in /docs
    oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


    def get_current_user(
        token: str = Depends(oauth2_scheme),
        db: Session = Depends(get_db)
    ):
        """Extract current user from JWT token."""
        payload = auth.verify_token(token)
        user_id: int = payload.get("user_id")

        if user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token payload"
            )

        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="User no longer exists"
            )

        if not user.is_active:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Account is deactivated"
            )

        return user


    def get_current_active_user(
        current_user = Depends(get_current_user)
    ):
        """Same as get_current_user — alias for clarity."""
        return current_user


Step 4 — Update schemas.py

Add auth-related schemas:


    # schemas.py

    from pydantic import BaseModel, Field, field_validator
    from datetime import datetime


    # ── User Schemas ─────────────────────────────────

    class UserCreate(BaseModel):
        name: str = Field(min_length=2, max_length=50)
        email: str
        password: str = Field(min_length=8)
        age: int = Field(ge=0, le=150)
        city: str = Field(default="Unknown")

        @field_validator("email")
        @classmethod
        def normalize_email(cls, value: str) -> str:
            return value.lower().strip()

        @field_validator("password")
        @classmethod
        def password_strength(cls, value: str) -> str:
            if value.isdigit():
                raise ValueError("Password cannot be only numbers")
            if value.isalpha():
                raise ValueError("Password must include at least one number")
            return value


    class UserUpdate(BaseModel):
        name: str | None = Field(default=None, min_length=2)
        age: int | None = Field(default=None, ge=0, le=150)
        city: str | None = None


    class UserResponse(BaseModel):
        id: int
        name: str
        email: str
        age: int
        city: str
        is_active: bool
        created_at: datetime

        model_config = {"from_attributes": True}


    # ── Auth Schemas ─────────────────────────────────

    class LoginRequest(BaseModel):
        email: str
        password: str


    class TokenResponse(BaseModel):
        access_token: str
        token_type: str = "bearer"
        user: UserResponse


    # ── Post Schemas ─────────────────────────────────

    class PostCreate(BaseModel):
        title: str = Field(min_length=5, max_length=100)
        content: str = Field(min_length=20)


    class PostUpdate(BaseModel):
        title: str | None = Field(default=None, min_length=5)
        content: str | None = Field(default=None, min_length=20)
        published: bool | None = None


    class PostResponse(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        author_id: int
        created_at: datetime

        model_config = {"from_attributes": True}


    class PostWithAuthor(BaseModel):
        id: int
        title: str
        content: str
        published: bool
        created_at: datetime
        author: UserResponse

        model_config = {"from_attributes": True}

Notice PostCreate no longer has author_id — logged in user's ID will be used automatically.

Step 5 — Update crud.py


    # crud.py

    from sqlalchemy.orm import Session
    import models
    import schemas
    import auth


    # ── User CRUD ─────────────────────────────────────

    def get_user(db: Session, user_id: int):
        return db.query(models.User).filter(models.User.id == user_id).first()


    def get_user_by_email(db: Session, email: str):
        return db.query(models.User).filter(models.User.email == email).first()


    def get_users(db: Session, skip: int = 0, limit: int = 10):
        return db.query(models.User).offset(skip).limit(limit).all()


    def create_user(db: Session, user: schemas.UserCreate):
        # Hash password before storing
        hashed = auth.hash_password(user.password)

        db_user = models.User(
            name=user.name,
            email=user.email,
            password=hashed,        # never store plain password
            age=user.age,
            city=user.city
        )

        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user


    def authenticate_user(db: Session, email: str, password: str):
        """Verify email and password — return user if valid, None if not."""
        user = get_user_by_email(db, email)
        if not user:
            return None
        if not auth.verify_password(password, user.password):
            return None
        return user


    def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        update_data = user.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_user, field, value)
        db.commit()
        db.refresh(db_user)
        return db_user


    def delete_user(db: Session, user_id: int):
        db_user = get_user(db, user_id)
        if not db_user:
            return None
        db.delete(db_user)
        db.commit()
        return db_user


    # ── Post CRUD ─────────────────────────────────────

    def get_post(db: Session, post_id: int):
        return db.query(models.Post).filter(models.Post.id == post_id).first()


    def get_posts(
        db: Session,
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        author_id: int | None = None
    ):
        query = db.query(models.Post)
        if published is not None:
            query = query.filter(models.Post.published == published)
        if author_id is not None:
            query = query.filter(models.Post.author_id == author_id)
        return query.offset(skip).limit(limit).all()


    def create_post(db: Session, post: schemas.PostCreate, author_id: int):
        db_post = models.Post(
            title=post.title,
            content=post.content,
            author_id=author_id         # use logged-in user's id
        )
        db.add(db_post)
        db.commit()
        db.refresh(db_post)
        return db_post


    def update_post(db: Session, post_id: int, post: schemas.PostUpdate):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        update_data = post.model_dump(exclude_none=True)
        for field, value in update_data.items():
            setattr(db_post, field, value)
        db.commit()
        db.refresh(db_post)
        return db_post


    def delete_post(db: Session, post_id: int):
        db_post = get_post(db, post_id)
        if not db_post:
            return None
        db.delete(db_post)
        db.commit()
        return db_post


Step 6 — main.py — Complete with Auth Routes


    # main.py

    from fastapi import FastAPI, HTTPException, Depends, status
    from fastapi.security import OAuth2PasswordRequestForm
    from sqlalchemy.orm import Session
    import models
    import schemas
    import crud
    import auth
    from database import engine, get_db
    from dependencies import get_current_user

    models.Base.metadata.create_all(bind=engine)

    app = FastAPI(title="Authenticated API", version="4.0.0")


    # ── Auth Routes ───────────────────────────────────

    @app.post("/auth/register", response_model=schemas.UserResponse, status_code=201)
    def register(user: schemas.UserCreate, db: Session = Depends(get_db)):
        existing = crud.get_user_by_email(db, user.email)
        if existing:
            raise HTTPException(status_code=400, detail="Email already registered")
        return crud.create_user(db, user)


    @app.post("/auth/login", response_model=schemas.TokenResponse)
    def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
        user = crud.authenticate_user(db, credentials.email, credentials.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid email or password"
            )

        token = auth.create_access_token(data={"user_id": user.id})

        return {
            "access_token": token,
            "token_type": "bearer",
            "user": user
        }


    # OAuth2 login — for /docs Authorize button to work
    @app.post("/auth/token")
    def login_for_docs(
        form_data: OAuth2PasswordRequestForm = Depends(),
        db: Session = Depends(get_db)
    ):
        user = crud.authenticate_user(db, form_data.username, form_data.password)
        if not user:
            raise HTTPException(status_code=401, detail="Invalid credentials")
        token = auth.create_access_token(data={"user_id": user.id})
        return {"access_token": token, "token_type": "bearer"}


    # ── User Routes ───────────────────────────────────

    @app.get("/users", response_model=list[schemas.UserResponse])
    def get_users(
        skip: int = 0,
        limit: int = 10,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)    # protected route
    ):
        return crud.get_users(db, skip, limit)


    @app.get("/users/me", response_model=schemas.UserResponse)
    def get_me(current_user = Depends(get_current_user)):
        """Get currently logged in user's profile."""
        return current_user


    @app.get("/users/{user_id}", response_model=schemas.UserResponse)
    def get_user(
        user_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        user = crud.get_user(db, user_id)
        if not user:
            raise HTTPException(status_code=404, detail="User not found")
        return user


    @app.patch("/users/me", response_model=schemas.UserResponse)
    def update_me(
        user_update: schemas.UserUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Update currently logged in user's profile."""
        return crud.update_user(db, current_user.id, user_update)


    @app.delete("/users/me", status_code=status.HTTP_204_NO_CONTENT)
    def delete_me(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Delete currently logged in user's account."""
        crud.delete_user(db, current_user.id)


    # ── Post Routes ───────────────────────────────────

    @app.post("/posts", response_model=schemas.PostResponse, status_code=201)
    def create_post(
        post: schemas.PostCreate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        return crud.create_post(db, post, author_id=current_user.id)


    @app.get("/posts", response_model=list[schemas.PostResponse])
    def get_posts(
        skip: int = 0,
        limit: int = 10,
        published: bool | None = None,
        db: Session = Depends(get_db)
        # no auth required — anyone can read posts
    ):
        return crud.get_posts(db, skip, limit, published)


    @app.get("/posts/my", response_model=list[schemas.PostResponse])
    def get_my_posts(
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        """Get all posts by currently logged in user."""
        return crud.get_posts(db, author_id=current_user.id)


    @app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
    def get_post(post_id: int, db: Session = Depends(get_db)):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")
        return post


    @app.patch("/posts/{post_id}", response_model=schemas.PostResponse)
    def update_post(
        post_id: int,
        post_update: schemas.PostUpdate,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can update their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to update this post")

        return crud.update_post(db, post_id, post_update)


    @app.delete("/posts/{post_id}", status_code=status.HTTP_204_NO_CONTENT)
    def delete_post(
        post_id: int,
        db: Session = Depends(get_db),
        current_user = Depends(get_current_user)
    ):
        post = crud.get_post(db, post_id)
        if not post:
            raise HTTPException(status_code=404, detail="Post not found")

        # Only author can delete their own post
        if post.author_id != current_user.id:
            raise HTTPException(status_code=403, detail="Not authorized to delete this post")

        crud.delete_post(db, post_id)


Testing Auth Flow in /docs

Step 1 — Register:

POST /auth/register
{
    "name": "Gagan Singh",
    "email": "gagan@email.com",
    "password": "secret123",
    "age": 22,
    "city": "Delhi"
}

Step 2 — Login:

POST /auth/login
{
    "email": "gagan@email.com",
    "password": "secret123"
}

Response:

{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
    "token_type": "bearer",
    "user": {...}
}

Step 3 — Authorize in /docs: Click the Authorize button (lock icon) at the top right of /docs. Enter your token. Now all protected routes work.

Step 4 — Test protected routes:

  • GET /users/me — returns your profile
  • POST /posts — creates a post as you
  • PATCH /posts/1 — works only if you're the author

Using Token in Thunder Client / Postman

Add header to every protected request:

Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

This is the standard Bearer token format — same as your NestJS frontend uses.


Environment Variables — Secure Your Secret Key

Never hardcode secrets. Use environment variables:

Create a .env file:

SECRET_KEY=your-super-secret-key-change-this-in-production-make-it-long
DATABASE_URL=sqlite:///./app.db
ACCESS_TOKEN_EXPIRE_MINUTES=30

Install python-dotenv:

pip install python-dotenv

Create config.py:

# config.py

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    secret_key: str
    database_url: str = "sqlite:///./app.db"
    access_token_expire_minutes: int = 30

    class Config:
        env_file = ".env"

settings = Settings()

Install pydantic-settings:

pip install pydantic-settings

Update auth.py to use settings:

from config import settings

SECRET_KEY = settings.secret_key
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes

Update database.py:

from config import settings
DATABASE_URL = settings.database_url

Add .env to .gitignore — never commit secrets to git.


Role-Based Access — Admin vs User

A common pattern — restricting certain routes to admin users only:

Add role to User model:

class User(Base):
    # ... existing fields
    role = Column(String(20), default="user")    # "user" or "admin"

Create admin dependency in dependencies.py:

def get_admin_user(current_user = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Admin access required"
        )
    return current_user

Use it on admin-only routes:

@app.delete("/admin/users/{user_id}", status_code=204)
def admin_delete_user(
    user_id: int,
    db: Session = Depends(get_db),
    admin = Depends(get_admin_user)    # only admins
):
    crud.delete_user(db, user_id)

This is the same pattern as Guards in NestJS.


Password Reset Flow — How It Works

In real apps password reset needs:

  1. User requests reset with their email
  2. Generate a temporary token, save it, email it to user
  3. User clicks link with token
  4. Verify token, let user set new password

Basic implementation:

import secrets

def create_password_reset_token() -> str:
    return secrets.token_urlsafe(32)    # secure random token

@app.post("/auth/forgot-password")
def forgot_password(email: str, db: Session = Depends(get_db)):
    user = crud.get_user_by_email(db, email)
    if not user:
        # Don't reveal if email exists or not — security best practice
        return {"message": "If that email exists, a reset link has been sent"}

    reset_token = create_password_reset_token()
    # In real app: save token to database with expiry, send email
    # For now just return it (in production NEVER return it directly)
    return {"reset_token": reset_token, "message": "Use this token to reset password"}

For sending emails you'd use fastapi-mail library. We'll keep that for the final project.


Security Checklist

Things every production API must have:

✅ Passwords hashed with bcrypt — done
✅ JWT tokens with expiry — done
✅ Protected routes — done
✅ Author-only post editing — done
✅ Role-based access — done
✅ Secrets in environment variables — done
✅ Email normalization — done (lowercase in validator)
❌ Rate limiting — prevents brute force attacks
❌ HTTPS — always in production
❌ Input sanitization — prevent SQL injection (SQLAlchemy handles this)
❌ Refresh tokens — long-lived sessions

Rate limiting example with slowapi:

pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/auth/login")
@limiter.limit("5/minute")    # max 5 login attempts per minute
def login(request: Request, credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
    ...

Final Project Structure

fastapi-learning/
├── venv/
├── .env                   ← secrets (never commit)
├── .gitignore
├── main.py                ← routes
├── database.py            ← db connection
├── models.py              ← SQLAlchemy tables
├── schemas.py             ← Pydantic models
├── crud.py                ← db operations
├── auth.py                ← JWT and password logic
├── dependencies.py        ← reusable dependencies
├── config.py              ← settings from .env
└── requirements.txt

Exercise šŸ‹️

Add these features to the current project:

1. Change Password Route:

PATCH /users/me/password
{
    "current_password": "secret123",
    "new_password": "newSecret456"
}
  • Verify current password before allowing change
  • Hash and save new password

2. Deactivate Account:

PATCH /users/me/deactivate
  • Sets is_active = False
  • Deactivated users cannot login

3. Admin Routes:

  • GET /admin/users — list all users including inactive (admin only)
  • DELETE /admin/users/{id} — force delete any user (admin only)
  • PATCH /admin/users/{id}/activate — reactivate a deactivated user

4. Post Ownership:

  • Regular users can only edit/delete their own posts
  • Admin can edit/delete any post

Final Real Project in Fast APIs

What We're Building A complete Task Management API — like a mini Trello/Jira backend. This project uses everything from all 6 stages ...