Skip to main content

FastAPI MongoDB Integration

Introduction

MongoDB is a popular NoSQL database that works exceptionally well with FastAPI applications due to its flexible document-based structure and JSON-like data format. In this tutorial, we'll explore how to integrate MongoDB with FastAPI to create robust and scalable web applications.

Unlike traditional relational databases, MongoDB stores data in flexible, JSON-like documents, making it a natural fit for Python applications. When combined with FastAPI's async capabilities, you can create high-performance APIs that can handle significant loads with minimal resources.

Prerequisites

Before we begin, make sure you have:

  • Python 3.7+ installed
  • Basic knowledge of FastAPI
  • MongoDB installed locally or access to a MongoDB Atlas account
  • Familiarity with async/await in Python (helpful but not required)

Setting Up Your Environment

First, let's install the necessary packages:

bash
pip install fastapi uvicorn pymongo motor

Here's what each package does:

  • fastapi: Our web framework for building APIs
  • uvicorn: ASGI server to run our FastAPI application
  • pymongo: Official MongoDB driver for Python
  • motor: Async MongoDB driver built on top of PyMongo

Project Structure

Let's organize our project with a clear structure:

mongodb_fastapi_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ └── models/
│ └── __init__.py
│ └── student.py
├── requirements.txt
└── .env

Connecting to MongoDB

First, let's set up the database connection in database.py:

python
import motor.motor_asyncio
from bson import ObjectId
from decouple import config

# Load environment variables
MONGODB_URL = config("MONGODB_URL", default="mongodb://localhost:27017")
DATABASE_NAME = config("DATABASE_NAME", default="fastapi_db")

# Create a client instance
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)

# Get database instance
database = client[DATABASE_NAME]

# Helper functions for ObjectId conversion
def parse_object_id(id: str) -> ObjectId:
return ObjectId(id)

def object_id_to_str(oid: ObjectId) -> str:
return str(oid)

Creating Data Models

Now, let's create our data model in models/student.py:

python
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime
from bson import ObjectId

# Custom ObjectId field for Pydantic
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate

@classmethod
def validate(cls, v):
if not isinstance(v, ObjectId):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
v = ObjectId(v)
return v

@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")

# Student input model
class StudentModel(BaseModel):
name: str = Field(...)
email: EmailStr = Field(...)
course: str = Field(...)
gpa: float = Field(..., ge=0.0, le=4.0)

class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5
}
}

# Student response model with ID
class StudentResponse(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
name: str
email: EmailStr
course: str
gpa: float
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)

class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}

# Student update model
class UpdateStudentModel(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
course: Optional[str] = None
gpa: Optional[float] = Field(None, ge=0.0, le=4.0)

class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"name": "Jane Doe",
"email": "[email protected]",
"course": "Data Science",
"gpa": 3.8
}
}

Creating the FastAPI Application

Now, let's build our main application file (main.py):

python
from fastapi import FastAPI, HTTPException, Body, status, Query, Depends
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from typing import List, Optional
from bson import ObjectId

from app.database import database, parse_object_id
from app.models.student import StudentModel, StudentResponse, UpdateStudentModel

app = FastAPI(
title="FastAPI MongoDB Integration",
description="A simple API demonstrating FastAPI integration with MongoDB",
version="1.0.0"
)

# Collection name
COLLECTION_NAME = "students"
students_collection = database[COLLECTION_NAME]

@app.get("/")
async def root():
return {"message": "Welcome to the FastAPI MongoDB integration example"}

# CREATE - Add a new student
@app.post("/students/", response_model=StudentResponse, status_code=status.HTTP_201_CREATED)
async def create_student(student: StudentModel = Body(...)):
student_dict = jsonable_encoder(student)

# Add timestamps
from datetime import datetime
student_dict["created_at"] = datetime.now()
student_dict["updated_at"] = datetime.now()

new_student = await students_collection.insert_one(student_dict)
created_student = await students_collection.find_one({"_id": new_student.inserted_id})

return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_student)

# READ - Get all students with optional filtering
@app.get("/students/", response_model=List[StudentResponse])
async def list_students(course: Optional[str] = Query(None), skip: int = 0, limit: int = 100):
query = {}
if course:
query["course"] = course

full_query = students_collection.find(query).skip(skip).limit(limit)
results = await full_query.to_list(length=limit)

return results

# READ - Get a single student by ID
@app.get("/students/{student_id}", response_model=StudentResponse)
async def get_student(student_id: str):
try:
student = await students_collection.find_one({"_id": parse_object_id(student_id)})
if student:
return student

raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid student ID format: {e}"
)

# UPDATE - Update a student
@app.put("/students/{student_id}", response_model=StudentResponse)
async def update_student(student_id: str, student_data: UpdateStudentModel = Body(...)):
try:
# Get only non-null fields from the request
student_dict = {k: v for k, v in student_data.dict().items() if v is not None}

# Add updated timestamp
student_dict["updated_at"] = datetime.now()

# Execute update
update_result = await students_collection.update_one(
{"_id": parse_object_id(student_id)},
{"$set": student_dict}
)

if update_result.modified_count == 0:
# Check if the document exists
existing = await students_collection.find_one({"_id": parse_object_id(student_id)})
if not existing:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)

# Return updated document
updated_student = await students_collection.find_one({"_id": parse_object_id(student_id)})
return updated_student

except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Update failed: {str(e)}"
)

# DELETE - Remove a student
@app.delete("/students/{student_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_student(student_id: str):
try:
delete_result = await students_collection.delete_one({"_id": parse_object_id(student_id)})

if delete_result.deleted_count == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)

return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content={})

except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Delete operation failed: {str(e)}"
)

Running the Application

Create a .env file with your MongoDB connection details:

MONGODB_URL=mongodb://localhost:27017
DATABASE_NAME=fastapi_students

Now, let's run our application:

bash
uvicorn app.main:app --reload

Visit http://localhost:8000/docs to see the Swagger UI documentation and interact with your API.

Testing Our API

Let's test our API using the interactive Swagger UI or with curl commands:

Create a new student

Request:

bash
curl -X 'POST' \
'http://localhost:8000/students/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5
}'

Response:

json
{
"_id": "60d5ec9af682d67b8f3f57f4",
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5,
"created_at": "2023-10-25T14:30:22.123456",
"updated_at": "2023-10-25T14:30:22.123456"
}

Get all students

Request:

bash
curl -X 'GET' 'http://localhost:8000/students/' -H 'accept: application/json'

Response:

json
[
{
"_id": "60d5ec9af682d67b8f3f57f4",
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5,
"created_at": "2023-10-25T14:30:22.123456",
"updated_at": "2023-10-25T14:30:22.123456"
}
]

Real-World Applications

This simple student API can be extended for various real-world applications:

Pagination Implementation

For large datasets, implement proper pagination:

python
@app.get("/students/paginated/", response_model=dict)
async def paginated_students(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100)
):
skip = (page - 1) * page_size

# Get data for current page
students = await students_collection.find().skip(skip).limit(page_size).to_list(length=page_size)

# Get total count for pagination info
total = await students_collection.count_documents({})

return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"data": students
}

Text Search Implementation

Enable powerful text search capabilities:

python
@app.get("/students/search/", response_model=List[StudentResponse])
async def search_students(q: str = Query(..., min_length=2)):
# Create text index if it doesn't exist
await students_collection.create_index([("name", "text"), ("course", "text")])

# Perform text search
results = await students_collection.find(
{"$text": {"$search": q}}
).to_list(length=100)

return results

Aggregation Pipeline Example

Let's calculate statistics about students per course:

python
from fastapi.responses import JSONResponse

@app.get("/statistics/courses/")
async def course_statistics():
pipeline = [
{"$group": {
"_id": "$course",
"student_count": {"$sum": 1},
"average_gpa": {"$avg": "$gpa"},
"max_gpa": {"$max": "$gpa"},
"min_gpa": {"$min": "$gpa"}
}},
{"$sort": {"student_count": -1}}
]

results = await students_collection.aggregate(pipeline).to_list(length=100)
return JSONResponse(content=results)

Best Practices for MongoDB with FastAPI

  1. Use Indexes: Create appropriate indexes on fields that you frequently query to improve performance.
python
# Create indexes during application startup
@app.on_event("startup")
async def create_indexes():
await students_collection.create_index("email", unique=True)
await students_collection.create_index([("name", 1), ("course", 1)])
# Text index for search
await students_collection.create_index([("name", "text"), ("course", "text")])
  1. Connection Management: Properly manage your MongoDB connections.
python
@app.on_event("shutdown")
async def shutdown_db_client():
client.close()
  1. Data Validation: Always validate incoming data using Pydantic models.

  2. Error Handling: Implement comprehensive error handling, especially for database operations.

  3. Environment Variables: Never hardcode sensitive information like connection strings.

Summary

In this tutorial, we've covered:

  1. Setting up a FastAPI project with MongoDB integration using Motor
  2. Creating Pydantic models for data validation
  3. Implementing CRUD operations (Create, Read, Update, Delete)
  4. Adding real-world examples like pagination, text search, and aggregation
  5. Best practices for FastAPI and MongoDB integration

MongoDB's document model pairs wonderfully with FastAPI's JSON-based nature, creating a powerful combination for building modern web APIs. The asynchronous nature of both FastAPI and Motor allows your application to handle many concurrent requests efficiently.

Further Exercises

  1. Implement Authentication: Add JWT authentication to secure your API endpoints.
  2. Add Relationship Handling: Create endpoints that handle relationships between collections (e.g., students and courses).
  3. File Uploads: Implement a feature to upload student profile pictures using GridFS.
  4. Logging Middleware: Create middleware that logs all database operations for debugging.
  5. Caching Layer: Implement Redis caching to reduce database load for frequently accessed data.

Additional Resources

Happy coding with FastAPI and MongoDB!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)