What We're Building
A complete Task Management API — like a mini Trello/Jira backend.
This project uses everything from all 6 stages together in a real, production-quality structure. When done, you'll have a portfolio project you can actually show.
Features
✅ User registration and login (JWT auth)
✅ Workspaces — users create workspaces, invite members
✅ Projects — inside workspaces
✅ Tasks — inside projects, assign to members
✅ Comments — on tasks
✅ File attachments — on tasks
✅ Role based access — owner, admin, member
✅ Background email notifications
✅ Pagination on all list endpoints
✅ Full filtering and search
✅ Consistent API responses
✅ Complete /docs documentation
Final Project Structure
task-manager-api/
├── routers/
│ ├── __init__.py
│ ├── auth.py
│ ├── users.py
│ ├── workspaces.py
│ ├── projects.py
│ ├── tasks.py
│ └── comments.py
├── uploads/
├── venv/
├── .env
├── .gitignore
├── main.py
├── database.py
├── models.py
├── schemas.py
├── crud.py
├── auth_utils.py
├── dependencies.py
├── config.py
├── utils.py
└── requirements.txt
Step 1 — Setup
mkdir task-manager-api
cd task-manager-api
python -m venv venv
venv\Scripts\activate # Windows
source venv/bin/activate # Mac/Linux
pip install fastapi uvicorn sqlalchemy passlib[bcrypt] python-jose[cryptography] python-multipart python-dotenv pydantic-settings
pip freeze > requirements.txt
Create .env:
SECRET_KEY=super-secret-key-make-this-very-long-and-random-in-production
DATABASE_URL=sqlite:///./taskmanager.db
ACCESS_TOKEN_EXPIRE_MINUTES=60
Create .gitignore:
venv/
__pycache__/
*.pyc
.env
*.db
uploads/
Step 2 — config.py
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
secret_key: str
database_url: str = "sqlite:///./taskmanager.db"
access_token_expire_minutes: int = 60
class Config:
env_file = ".env"
settings = Settings()
Step 3 — database.py
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import settings
engine = create_engine(
settings.database_url,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Step 4 — models.py
This is the core — all database tables:
# models.py
from sqlalchemy import (
Column, Integer, String, Boolean, DateTime,
Text, ForeignKey, Enum as SQLEnum
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from database import Base
import enum
# ── Enums ─────────────────────────────────────────
class WorkspaceRole(str, enum.Enum):
owner = "owner"
admin = "admin"
member = "member"
class TaskStatus(str, enum.Enum):
todo = "todo"
in_progress = "in_progress"
in_review = "in_review"
done = "done"
class TaskPriority(str, enum.Enum):
low = "low"
medium = "medium"
high = "high"
urgent = "urgent"
# ── Tables ────────────────────────────────────────
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, nullable=False, index=True)
password = Column(String(255), nullable=False)
avatar = Column(String(255), nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
workspace_memberships = relationship("WorkspaceMember", back_populates="user")
assigned_tasks = relationship("Task", back_populates="assignee", foreign_keys="Task.assignee_id")
comments = relationship("Comment", back_populates="author")
class Workspace(Base):
__tablename__ = "workspaces"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
created_at = Column(DateTime, server_default=func.now())
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
owner = relationship("User")
members = relationship("WorkspaceMember", back_populates="workspace")
projects = relationship("Project", back_populates="workspace")
class WorkspaceMember(Base):
__tablename__ = "workspace_members"
id = Column(Integer, primary_key=True, index=True)
workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
role = Column(SQLEnum(WorkspaceRole), default=WorkspaceRole.member)
joined_at = Column(DateTime, server_default=func.now())
workspace = relationship("Workspace", back_populates="members")
user = relationship("User", back_populates="workspace_memberships")
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, server_default=func.now())
workspace_id = Column(Integer, ForeignKey("workspaces.id"), nullable=False)
workspace = relationship("Workspace", back_populates="projects")
tasks = relationship("Task", back_populates="project")
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(Text, nullable=True)
status = Column(SQLEnum(TaskStatus), default=TaskStatus.todo)
priority = Column(SQLEnum(TaskPriority), default=TaskPriority.medium)
due_date = Column(DateTime, nullable=True)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
project_id = Column(Integer, ForeignKey("projects.id"), nullable=False)
creator_id = Column(Integer, ForeignKey("users.id"), nullable=False)
assignee_id = Column(Integer, ForeignKey("users.id"), nullable=True)
project = relationship("Project", back_populates="tasks")
creator = relationship("User", foreign_keys=[creator_id])
assignee = relationship("User", back_populates="assigned_tasks", foreign_keys=[assignee_id])
comments = relationship("Comment", back_populates="task")
attachments = relationship("Attachment", back_populates="task")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
author_id = Column(Integer, ForeignKey("users.id"), nullable=False)
task = relationship("Task", back_populates="comments")
author = relationship("User", back_populates="comments")
class Attachment(Base):
__tablename__ = "attachments"
id = Column(Integer, primary_key=True, index=True)
filename = Column(String(255), nullable=False)
original_name = Column(String(255), nullable=False)
file_size = Column(Integer, nullable=False)
content_type = Column(String(100), nullable=False)
uploaded_at = Column(DateTime, server_default=func.now())
task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
uploaded_by = Column(Integer, ForeignKey("users.id"), nullable=False)
task = relationship("Task", back_populates="attachments")
uploader = relationship("User")
Step 5 — auth_utils.py
# auth_utils.py
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
from config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str:
payload = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
payload.update({"exp": expire})
return jwt.encode(payload, settings.secret_key, algorithm="HS256")
def verify_token(token: str) -> dict:
try:
return jwt.decode(token, settings.secret_key, algorithms=["HS256"])
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"}
)
Step 6 — schemas.py
# schemas.py
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Generic, TypeVar
from models import TaskStatus, TaskPriority, WorkspaceRole
T = TypeVar("T")
# ── Generic Response Wrapper ──────────────────────
class APIResponse(BaseModel, Generic[T]):
success: bool = True
message: str = "Success"
data: T | None = None
class PaginatedResponse(BaseModel, Generic[T]):
success: bool = True
data: list[T] = []
total: int = 0
page: int = 1
per_page: int = 10
total_pages: int = 0
has_next: bool = False
has_prev: bool = False
# ── User Schemas ─────────────────────────────────
class UserCreate(BaseModel):
name: str = Field(min_length=2, max_length=50)
email: str
password: str = Field(min_length=8)
@field_validator("email")
@classmethod
def normalize_email(cls, v: str) -> str:
return v.lower().strip()
@field_validator("password")
@classmethod
def validate_password(cls, v: str) -> str:
if v.isdigit():
raise ValueError("Password cannot be only numbers")
if v.isalpha():
raise ValueError("Password must include at least one number")
return v
class UserUpdate(BaseModel):
name: str | None = Field(default=None, min_length=2)
class UserResponse(BaseModel):
id: int
name: str
email: str
avatar: str | None
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
class LoginRequest(BaseModel):
email: str
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
user: UserResponse
# ── Workspace Schemas ─────────────────────────────
class WorkspaceCreate(BaseModel):
name: str = Field(min_length=2, max_length=100)
description: str | None = Field(default=None, max_length=500)
class WorkspaceUpdate(BaseModel):
name: str | None = Field(default=None, min_length=2)
description: str | None = None
class WorkspaceMemberResponse(BaseModel):
id: int
user: UserResponse
role: WorkspaceRole
joined_at: datetime
model_config = {"from_attributes": True}
class WorkspaceResponse(BaseModel):
id: int
name: str
description: str | None
owner_id: int
created_at: datetime
member_count: int = 0
model_config = {"from_attributes": True}
class InviteMember(BaseModel):
email: str
role: WorkspaceRole = WorkspaceRole.member
# ── Project Schemas ───────────────────────────────
class ProjectCreate(BaseModel):
name: str = Field(min_length=2, max_length=100)
description: str | None = Field(default=None, max_length=500)
class ProjectUpdate(BaseModel):
name: str | None = Field(default=None, min_length=2)
description: str | None = None
is_active: bool | None = None
class ProjectResponse(BaseModel):
id: int
name: str
description: str | None
is_active: bool
workspace_id: int
created_at: datetime
task_count: int = 0
model_config = {"from_attributes": True}
# ── Task Schemas ──────────────────────────────────
class TaskCreate(BaseModel):
title: str = Field(min_length=3, max_length=200)
description: str | None = None
priority: TaskPriority = TaskPriority.medium
due_date: datetime | None = None
assignee_id: int | None = None
class TaskUpdate(BaseModel):
title: str | None = Field(default=None, min_length=3)
description: str | None = None
status: TaskStatus | None = None
priority: TaskPriority | None = None
due_date: datetime | None = None
assignee_id: int | None = None
class TaskResponse(BaseModel):
id: int
title: str
description: str | None
status: TaskStatus
priority: TaskPriority
due_date: datetime | None
project_id: int
creator_id: int
assignee_id: int | None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
class TaskDetailResponse(BaseModel):
id: int
title: str
description: str | None
status: TaskStatus
priority: TaskPriority
due_date: datetime | None
created_at: datetime
updated_at: datetime
creator: UserResponse
assignee: UserResponse | None
comment_count: int = 0
model_config = {"from_attributes": True}
# ── Comment Schemas ───────────────────────────────
class CommentCreate(BaseModel):
content: str = Field(min_length=1, max_length=2000)
class CommentUpdate(BaseModel):
content: str = Field(min_length=1, max_length=2000)
class CommentResponse(BaseModel):
id: int
content: str
task_id: int
author: UserResponse
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}
# ── Attachment Schema ─────────────────────────────
class AttachmentResponse(BaseModel):
id: int
filename: str
original_name: str
file_size: int
content_type: str
uploaded_at: datetime
task_id: int
model_config = {"from_attributes": True}
Step 7 — utils.py
# utils.py
from sqlalchemy.orm import Session
def paginate(query, page: int = 1, per_page: int = 10) -> dict:
"""Apply pagination to any SQLAlchemy query."""
total = query.count()
total_pages = (total + per_page - 1) // per_page
items = query.offset((page - 1) * per_page).limit(per_page).all()
return {
"data": items,
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1
}
def send_notification_email(to_email: str, subject: str, body: str):
"""Simulate sending email — replace with real email library in production."""
print(f"\nš§ EMAIL TO: {to_email}")
print(f" SUBJECT: {subject}")
print(f" BODY: {body}\n")
Step 8 — dependencies.py
# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from database import get_db
import auth_utils
import models
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> models.User:
payload = auth_utils.verify_token(token)
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account deactivated")
return user
def get_workspace_member(
workspace_id: int,
current_user: models.User = Depends(get_current_user),
db: Session = Depends(get_db)
) -> models.WorkspaceMember:
"""Verify user is a member of the workspace."""
membership = db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == workspace_id,
models.WorkspaceMember.user_id == current_user.id
).first()
if not membership:
raise HTTPException(
status_code=403,
detail="You are not a member of this workspace"
)
return membership
def get_workspace_admin(
membership: models.WorkspaceMember = Depends(get_workspace_member)
) -> models.WorkspaceMember:
"""Verify user is admin or owner of the workspace."""
if membership.role not in [models.WorkspaceRole.owner, models.WorkspaceRole.admin]:
raise HTTPException(
status_code=403,
detail="Admin access required"
)
return membership
Step 9 — All Routers
routers/auth.py
# routers/auth.py
from fastapi import APIRouter, HTTPException, Depends, status
from sqlalchemy.orm import Session
import models
import schemas
import auth_utils
from database import get_db
from dependencies import get_current_user
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=schemas.UserResponse, status_code=201)
def register(user_data: schemas.UserCreate, db: Session = Depends(get_db)):
if db.query(models.User).filter(models.User.email == user_data.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
hashed = auth_utils.hash_password(user_data.password)
user = models.User(
name=user_data.name,
email=user_data.email,
password=hashed
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=schemas.TokenResponse)
def login(credentials: schemas.LoginRequest, db: Session = Depends(get_db)):
user = db.query(models.User).filter(
models.User.email == credentials.email.lower()
).first()
if not user or not auth_utils.verify_password(credentials.password, user.password):
raise HTTPException(status_code=401, detail="Invalid email or password")
if not user.is_active:
raise HTTPException(status_code=403, detail="Account deactivated")
token = auth_utils.create_access_token({"user_id": user.id})
return {"access_token": token, "token_type": "bearer", "user": user}
@router.post("/token") # for /docs Authorize button
def login_for_docs(
form_data: schemas.LoginRequest,
db: Session = Depends(get_db)
):
user = db.query(models.User).filter(
models.User.email == form_data.email.lower()
).first()
if not user or not auth_utils.verify_password(form_data.password, user.password):
raise HTTPException(status_code=401, detail="Invalid credentials")
token = auth_utils.create_access_token({"user_id": user.id})
return {"access_token": token, "token_type": "bearer"}
@router.get("/me", response_model=schemas.UserResponse)
def get_me(current_user: models.User = Depends(get_current_user)):
return current_user
@router.patch("/me", response_model=schemas.UserResponse)
def update_me(
update: schemas.UserUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
if update.name:
current_user.name = update.name
db.commit()
db.refresh(current_user)
return current_user
routers/workspaces.py
# routers/workspaces.py
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate, send_notification_email
router = APIRouter(prefix="/workspaces", tags=["Workspaces"])
@router.post("", response_model=schemas.WorkspaceResponse, status_code=201)
def create_workspace(
data: schemas.WorkspaceCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
workspace = models.Workspace(
name=data.name,
description=data.description,
owner_id=current_user.id
)
db.add(workspace)
db.flush() # get ID before commit
# Add creator as owner member
member = models.WorkspaceMember(
workspace_id=workspace.id,
user_id=current_user.id,
role=models.WorkspaceRole.owner
)
db.add(member)
db.commit()
db.refresh(workspace)
workspace.member_count = 1
return workspace
@router.get("", response_model=schemas.PaginatedResponse[schemas.WorkspaceResponse])
def get_my_workspaces(
page: int = 1,
per_page: int = 10,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Workspace).join(
models.WorkspaceMember
).filter(
models.WorkspaceMember.user_id == current_user.id
)
result = paginate(query, page, per_page)
for ws in result["data"]:
ws.member_count = db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == ws.id
).count()
return result
@router.get("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def get_workspace(
workspace_id: int,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_member)
):
workspace = db.query(models.Workspace).filter(
models.Workspace.id == workspace_id
).first()
workspace.member_count = db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == workspace_id
).count()
return workspace
@router.patch("/{workspace_id}", response_model=schemas.WorkspaceResponse)
def update_workspace(
workspace_id: int,
data: schemas.WorkspaceUpdate,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
workspace = db.query(models.Workspace).filter(
models.Workspace.id == workspace_id
).first()
if data.name:
workspace.name = data.name
if data.description is not None:
workspace.description = data.description
db.commit()
db.refresh(workspace)
workspace.member_count = len(workspace.members)
return workspace
@router.post("/{workspace_id}/invite", response_model=schemas.WorkspaceMemberResponse)
def invite_member(
workspace_id: int,
invite: schemas.InviteMember,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
# Find user by email
user = db.query(models.User).filter(
models.User.email == invite.email.lower()
).first()
if not user:
raise HTTPException(status_code=404, detail="User with this email not found")
# Check already member
existing = db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == workspace_id,
models.WorkspaceMember.user_id == user.id
).first()
if existing:
raise HTTPException(status_code=400, detail="User is already a member")
# Add member
new_member = models.WorkspaceMember(
workspace_id=workspace_id,
user_id=user.id,
role=invite.role
)
db.add(new_member)
db.commit()
db.refresh(new_member)
# Send notification in background
workspace = db.query(models.Workspace).filter(
models.Workspace.id == workspace_id
).first()
background_tasks.add_task(
send_notification_email,
user.email,
f"You've been added to {workspace.name}",
f"Hi {user.name}, you've been added as {invite.role} to workspace '{workspace.name}'"
)
return new_member
@router.get("/{workspace_id}/members", response_model=list[schemas.WorkspaceMemberResponse])
def get_members(
workspace_id: int,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_member)
):
return db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == workspace_id
).all()
@router.delete("/{workspace_id}/members/{user_id}", status_code=204)
def remove_member(
workspace_id: int,
user_id: int,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
member = db.query(models.WorkspaceMember).filter(
models.WorkspaceMember.workspace_id == workspace_id,
models.WorkspaceMember.user_id == user_id
).first()
if not member:
raise HTTPException(status_code=404, detail="Member not found")
if member.role == models.WorkspaceRole.owner:
raise HTTPException(status_code=400, detail="Cannot remove workspace owner")
db.delete(member)
db.commit()
routers/projects.py
# routers/projects.py
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member, get_workspace_admin
from utils import paginate
router = APIRouter(prefix="/workspaces/{workspace_id}/projects", tags=["Projects"])
@router.post("", response_model=schemas.ProjectResponse, status_code=201)
def create_project(
workspace_id: int,
data: schemas.ProjectCreate,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
project = models.Project(
name=data.name,
description=data.description,
workspace_id=workspace_id
)
db.add(project)
db.commit()
db.refresh(project)
project.task_count = 0
return project
@router.get("", response_model=schemas.PaginatedResponse[schemas.ProjectResponse])
def get_projects(
workspace_id: int,
page: int = 1,
per_page: int = 10,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_member)
):
query = db.query(models.Project).filter(
models.Project.workspace_id == workspace_id
)
result = paginate(query, page, per_page)
for project in result["data"]:
project.task_count = db.query(models.Task).filter(
models.Task.project_id == project.id
).count()
return result
@router.get("/{project_id}", response_model=schemas.ProjectResponse)
def get_project(
workspace_id: int,
project_id: int,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_member)
):
project = db.query(models.Project).filter(
models.Project.id == project_id,
models.Project.workspace_id == workspace_id
).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
project.task_count = db.query(models.Task).filter(
models.Task.project_id == project_id
).count()
return project
@router.patch("/{project_id}", response_model=schemas.ProjectResponse)
def update_project(
workspace_id: int,
project_id: int,
data: schemas.ProjectUpdate,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
project = db.query(models.Project).filter(
models.Project.id == project_id,
models.Project.workspace_id == workspace_id
).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
update_data = data.model_dump(exclude_none=True)
for field, value in update_data.items():
setattr(project, field, value)
db.commit()
db.refresh(project)
project.task_count = len(project.tasks)
return project
@router.delete("/{project_id}", status_code=204)
def delete_project(
workspace_id: int,
project_id: int,
db: Session = Depends(get_db),
membership: models.WorkspaceMember = Depends(get_workspace_admin)
):
project = db.query(models.Project).filter(
models.Project.id == project_id,
models.Project.workspace_id == workspace_id
).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
db.delete(project)
db.commit()
routers/tasks.py
# routers/tasks.py
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, File, UploadFile
from sqlalchemy.orm import Session
from sqlalchemy import or_
import os
import uuid
import models
import schemas
from database import get_db
from dependencies import get_current_user, get_workspace_member
from utils import paginate, send_notification_email
from models import TaskStatus, TaskPriority
router = APIRouter(prefix="/projects/{project_id}/tasks", tags=["Tasks"])
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
def get_project_or_404(project_id: int, db: Session) -> models.Project:
project = db.query(models.Project).filter(models.Project.id == project_id).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
return project
@router.post("", response_model=schemas.TaskResponse, status_code=201)
def create_task(
project_id: int,
data: schemas.TaskCreate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
project = get_project_or_404(project_id, db)
task = models.Task(
title=data.title,
description=data.description,
priority=data.priority,
due_date=data.due_date,
assignee_id=data.assignee_id,
project_id=project_id,
creator_id=current_user.id
)
db.add(task)
db.commit()
db.refresh(task)
# Notify assignee in background
if data.assignee_id and data.assignee_id != current_user.id:
assignee = db.query(models.User).filter(
models.User.id == data.assignee_id
).first()
if assignee:
background_tasks.add_task(
send_notification_email,
assignee.email,
f"New task assigned: {task.title}",
f"Hi {assignee.name}, you have been assigned task '{task.title}' in project '{project.name}'"
)
return task
@router.get("", response_model=schemas.PaginatedResponse[schemas.TaskResponse])
def get_tasks(
project_id: int,
page: int = 1,
per_page: int = 10,
status: TaskStatus | None = None,
priority: TaskPriority | None = None,
assignee_id: int | None = None,
search: str | None = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
get_project_or_404(project_id, db)
query = db.query(models.Task).filter(models.Task.project_id == project_id)
if status:
query = query.filter(models.Task.status == status)
if priority:
query = query.filter(models.Task.priority == priority)
if assignee_id:
query = query.filter(models.Task.assignee_id == assignee_id)
if search:
query = query.filter(
or_(
models.Task.title.ilike(f"%{search}%"),
models.Task.description.ilike(f"%{search}%")
)
)
return paginate(query, page, per_page)
@router.get("/{task_id}", response_model=schemas.TaskDetailResponse)
def get_task(
project_id: int,
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.project_id == project_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
task.comment_count = db.query(models.Comment).filter(
models.Comment.task_id == task_id
).count()
return task
@router.patch("/{task_id}", response_model=schemas.TaskResponse)
def update_task(
project_id: int,
task_id: int,
data: schemas.TaskUpdate,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.project_id == project_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
old_assignee_id = task.assignee_id
update_data = data.model_dump(exclude_none=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
# Notify new assignee if changed
if data.assignee_id and data.assignee_id != old_assignee_id:
new_assignee = db.query(models.User).filter(
models.User.id == data.assignee_id
).first()
if new_assignee:
background_tasks.add_task(
send_notification_email,
new_assignee.email,
f"Task reassigned to you: {task.title}",
f"Hi {new_assignee.name}, task '{task.title}' has been assigned to you."
)
return task
@router.delete("/{task_id}", status_code=204)
def delete_task(
project_id: int,
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.project_id == project_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.creator_id != current_user.id:
raise HTTPException(status_code=403, detail="Only task creator can delete")
db.delete(task)
db.commit()
@router.post("/{task_id}/attachments", response_model=schemas.AttachmentResponse)
async def upload_attachment(
project_id: int,
task_id: int,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.project_id == project_id
).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
content = await file.read()
if len(content) > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="File too large. Max 10MB")
extension = file.filename.split(".")[-1].lower()
unique_name = f"{uuid.uuid4()}.{extension}"
file_path = os.path.join(UPLOAD_DIR, unique_name)
with open(file_path, "wb") as f:
f.write(content)
attachment = models.Attachment(
filename=unique_name,
original_name=file.filename,
file_size=len(content),
content_type=file.content_type,
task_id=task_id,
uploaded_by=current_user.id
)
db.add(attachment)
db.commit()
db.refresh(attachment)
return attachment
@router.get("/{task_id}/attachments", response_model=list[schemas.AttachmentResponse])
def get_attachments(
project_id: int,
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
return db.query(models.Attachment).filter(
models.Attachment.task_id == task_id
).all()
routers/comments.py
# routers/comments.py
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
import models
import schemas
from database import get_db
from dependencies import get_current_user
from utils import paginate
router = APIRouter(prefix="/tasks/{task_id}/comments", tags=["Comments"])
@router.post("", response_model=schemas.CommentResponse, status_code=201)
def create_comment(
task_id: int,
data: schemas.CommentCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(models.Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
comment = models.Comment(
content=data.content,
task_id=task_id,
author_id=current_user.id
)
db.add(comment)
db.commit()
db.refresh(comment)
return comment
@router.get("", response_model=schemas.PaginatedResponse[schemas.CommentResponse])
def get_comments(
task_id: int,
page: int = 1,
per_page: int = 20,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Comment).filter(
models.Comment.task_id == task_id
).order_by(models.Comment.created_at.asc())
return paginate(query, page, per_page)
@router.patch("/{comment_id}", response_model=schemas.CommentResponse)
def update_comment(
task_id: int,
comment_id: int,
data: schemas.CommentUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
comment = db.query(models.Comment).filter(
models.Comment.id == comment_id,
models.Comment.task_id == task_id
).first()
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
if comment.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Can only edit your own comments")
comment.content = data.content
db.commit()
db.refresh(comment)
return comment
@router.delete("/{comment_id}", status_code=204)
def delete_comment(
task_id: int,
comment_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
comment = db.query(models.Comment).filter(
models.Comment.id == comment_id,
models.Comment.task_id == task_id
).first()
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
if comment.author_id != current_user.id:
raise HTTPException(status_code=403, detail="Can only delete your own comments")
db.delete(comment)
db.commit()
Step 10 — main.py — Final Clean Entry Point
# main.py
import time
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
import models
from database import engine
from routers import auth, users, workspaces, projects, tasks, comments
import os
# Create tables
models.Base.metadata.create_all(bind=engine)
# Create uploads folder
os.makedirs("uploads", exist_ok=True)
app = FastAPI(
title="Task Manager API",
description="""
A complete task management API built with FastAPI.
## Features
- JWT Authentication
- Workspaces with role-based access
- Projects inside workspaces
- Tasks with priorities and statuses
- Comments on tasks
- File attachments
- Background notifications
""",
version="1.0.0",
contact={
"name": "Gagan",
"email": "gagan@email.com"
}
)
# ── CORS ──────────────────────────────────────────
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ── Request Logging Middleware ────────────────────
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = round((time.time() - start) * 1000, 2)
print(f"[{request.method}] {request.url.path} → {response.status_code} ({duration}ms)")
return response
# ── Exception Handlers ────────────────────────────
@app.exception_handler(RequestValidationError)
async def validation_handler(request: Request, exc: RequestValidationError):
errors = [
{
"field": " → ".join(str(l) for l in e["loc"]),
"message": e["msg"]
}
for e in exc.errors()
]
return JSONResponse(
status_code=422,
content={"success": False, "message": "Validation failed", "errors": errors}
)
@app.exception_handler(404)
async def not_found_handler(request: Request, exc):
return JSONResponse(
status_code=404,
content={"success": False, "message": f"Route not found: {request.url.path}"}
)
# ── Static Files ──────────────────────────────────
app.mount("/files", StaticFiles(directory="uploads"), name="files")
# ── Routers ───────────────────────────────────────
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(workspaces.router)
app.include_router(projects.router)
app.include_router(tasks.router)
app.include_router(comments.router)
# ── Root ──────────────────────────────────────────
@app.get("/", tags=["Root"])
def root():
return {
"message": "Task Manager API",
"version": "1.0.0",
"docs": "/docs",
"redoc": "/redoc"
}
Running the Final Project
uvicorn main:app --reload
Open http://localhost:8000/docs — you'll see the complete API organized into 6 sections:
- Authentication
- Users
- Workspaces
- Projects
- Tasks
- Comments
Complete Testing Flow
Test this complete scenario in /docs:
1. Register two users:
- User A:
gagan@email.com - User B:
rahul@email.com
2. Login as User A → get token → Authorize in /docs
3. Create a workspace:
{"name": "My Team", "description": "Our workspace"}
4. Invite User B to workspace:
{"email": "rahul@email.com", "role": "member"}
Watch terminal — notification email prints in background.
5. Create a project:
{"name": "Website Redesign", "description": "Q1 project"}
6. Create a task:
{
"title": "Design homepage mockup",
"description": "Create Figma designs for the new homepage",
"priority": "high",
"assignee_id": 2
}
Watch terminal — assignment notification prints.
7. Login as User B → add a comment on the task
8. Upload an attachment to the task
9. Update task status to in_progress
10. Mark task as done
What You've Built
A complete production-grade REST API with:
✅ Clean project structure — routers, models, schemas separated
✅ JWT authentication — register, login, protected routes
✅ Role-based access — owner, admin, member
✅ Full CRUD — workspaces, projects, tasks, comments
✅ Relationships — nested resources, foreign keys
✅ Pagination — on all list endpoints
✅ Search and filtering — tasks by status, priority, assignee
✅ File uploads — attachments on tasks
✅ Background tasks — email notifications
✅ CORS — ready for frontend connection
✅ Middleware — request logging
✅ Custom exception handlers — consistent errors
✅ Auto documentation — /docs and /redoc
Connecting to Your Next.js Frontend
Since you already build Next.js apps — connecting is straightforward:
// lib/api.js in your Next.js project
const API_URL = "http://localhost:8000"
export async function login(email, password) {
const res = await fetch(`${API_URL}/auth/login`, {
method: "POST",
headers: {"Content-Type": "application/json"},
body: JSON.stringify({email, password})
})
return res.json() // returns {access_token, user}
}
export async function getTasks(projectId, token) {
const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
headers: {"Authorization": `Bearer ${token}`}
})
return res.json()
}
export async function createTask(projectId, taskData, token) {
const res = await fetch(`${API_URL}/projects/${projectId}/tasks`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${token}`
},
body: JSON.stringify(taskData)
})
return res.json()
}
Same Bearer token pattern your NestJS frontend already uses — zero learning curve.
What's Next
You now know FastAPI end to end. Depending on where you want to go:
Immediate next steps for this project:
- Switch SQLite to PostgreSQL — one line change in
.env - Deploy to a VPS with Nginx + Uvicorn
- Add email sending with
fastapi-mail - Add WebSockets for real-time task updates
- Write tests with
pytest
Deeper FastAPI topics:
- Alembic migrations — managing database schema changes
- WebSockets — real-time features
- Celery + Redis — heavy background job processing
- pytest — testing your API endpoints
- Docker — containerizing your app
Given your NestJS and PostgreSQL background — switching to PostgreSQL and deploying this to your existing Azure infrastructure would be the natural next step. The code is already ready for it.