FastAPI MongoDB Integration
Introduction
MongoDB is a popular NoSQL database that works exceptionally well with FastAPI applications due to its flexible document-based structure and JSON-like data format. In this tutorial, we'll explore how to integrate MongoDB with FastAPI to create robust and scalable web applications.
Unlike traditional relational databases, MongoDB stores data in flexible, JSON-like documents, making it a natural fit for Python applications. When combined with FastAPI's async capabilities, you can create high-performance APIs that can handle significant loads with minimal resources.
Prerequisites
Before we begin, make sure you have:
- Python 3.7+ installed
- Basic knowledge of FastAPI
- MongoDB installed locally or access to a MongoDB Atlas account
- Familiarity with async/await in Python (helpful but not required)
Setting Up Your Environment
First, let's install the necessary packages:
pip install fastapi uvicorn pymongo motor
Here's what each package does:
fastapi
: Our web framework for building APIsuvicorn
: ASGI server to run our FastAPI applicationpymongo
: Official MongoDB driver for Pythonmotor
: Async MongoDB driver built on top of PyMongo
Project Structure
Let's organize our project with a clear structure:
mongodb_fastapi_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── database.py
│ └── models/
│ └── __init__.py
│ └── student.py
├── requirements.txt
└── .env
Connecting to MongoDB
First, let's set up the database connection in database.py
:
import motor.motor_asyncio
from bson import ObjectId
from decouple import config
# Load environment variables
MONGODB_URL = config("MONGODB_URL", default="mongodb://localhost:27017")
DATABASE_NAME = config("DATABASE_NAME", default="fastapi_db")
# Create a client instance
client = motor.motor_asyncio.AsyncIOMotorClient(MONGODB_URL)
# Get database instance
database = client[DATABASE_NAME]
# Helper functions for ObjectId conversion
def parse_object_id(id: str) -> ObjectId:
return ObjectId(id)
def object_id_to_str(oid: ObjectId) -> str:
return str(oid)
Creating Data Models
Now, let's create our data model in models/student.py
:
from pydantic import BaseModel, Field, EmailStr
from typing import Optional
from datetime import datetime
from bson import ObjectId
# Custom ObjectId field for Pydantic
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not isinstance(v, ObjectId):
if not ObjectId.is_valid(v):
raise ValueError("Invalid ObjectId")
v = ObjectId(v)
return v
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type="string")
# Student input model
class StudentModel(BaseModel):
name: str = Field(...)
email: EmailStr = Field(...)
course: str = Field(...)
gpa: float = Field(..., ge=0.0, le=4.0)
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5
}
}
# Student response model with ID
class StudentResponse(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias="_id")
name: str
email: EmailStr
course: str
gpa: float
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
# Student update model
class UpdateStudentModel(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
course: Optional[str] = None
gpa: Optional[float] = Field(None, ge=0.0, le=4.0)
class Config:
allow_population_by_field_name = True
arbitrary_types_allowed = True
json_encoders = {ObjectId: str}
schema_extra = {
"example": {
"name": "Jane Doe",
"email": "[email protected]",
"course": "Data Science",
"gpa": 3.8
}
}
Creating the FastAPI Application
Now, let's build our main application file (main.py
):
from fastapi import FastAPI, HTTPException, Body, status, Query, Depends
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from typing import List, Optional
from bson import ObjectId
from app.database import database, parse_object_id
from app.models.student import StudentModel, StudentResponse, UpdateStudentModel
app = FastAPI(
title="FastAPI MongoDB Integration",
description="A simple API demonstrating FastAPI integration with MongoDB",
version="1.0.0"
)
# Collection name
COLLECTION_NAME = "students"
students_collection = database[COLLECTION_NAME]
@app.get("/")
async def root():
return {"message": "Welcome to the FastAPI MongoDB integration example"}
# CREATE - Add a new student
@app.post("/students/", response_model=StudentResponse, status_code=status.HTTP_201_CREATED)
async def create_student(student: StudentModel = Body(...)):
student_dict = jsonable_encoder(student)
# Add timestamps
from datetime import datetime
student_dict["created_at"] = datetime.now()
student_dict["updated_at"] = datetime.now()
new_student = await students_collection.insert_one(student_dict)
created_student = await students_collection.find_one({"_id": new_student.inserted_id})
return JSONResponse(status_code=status.HTTP_201_CREATED, content=created_student)
# READ - Get all students with optional filtering
@app.get("/students/", response_model=List[StudentResponse])
async def list_students(course: Optional[str] = Query(None), skip: int = 0, limit: int = 100):
query = {}
if course:
query["course"] = course
full_query = students_collection.find(query).skip(skip).limit(limit)
results = await full_query.to_list(length=limit)
return results
# READ - Get a single student by ID
@app.get("/students/{student_id}", response_model=StudentResponse)
async def get_student(student_id: str):
try:
student = await students_collection.find_one({"_id": parse_object_id(student_id)})
if student:
return student
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid student ID format: {e}"
)
# UPDATE - Update a student
@app.put("/students/{student_id}", response_model=StudentResponse)
async def update_student(student_id: str, student_data: UpdateStudentModel = Body(...)):
try:
# Get only non-null fields from the request
student_dict = {k: v for k, v in student_data.dict().items() if v is not None}
# Add updated timestamp
student_dict["updated_at"] = datetime.now()
# Execute update
update_result = await students_collection.update_one(
{"_id": parse_object_id(student_id)},
{"$set": student_dict}
)
if update_result.modified_count == 0:
# Check if the document exists
existing = await students_collection.find_one({"_id": parse_object_id(student_id)})
if not existing:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)
# Return updated document
updated_student = await students_collection.find_one({"_id": parse_object_id(student_id)})
return updated_student
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Update failed: {str(e)}"
)
# DELETE - Remove a student
@app.delete("/students/{student_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_student(student_id: str):
try:
delete_result = await students_collection.delete_one({"_id": parse_object_id(student_id)})
if delete_result.deleted_count == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Student with ID {student_id} not found"
)
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT, content={})
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Delete operation failed: {str(e)}"
)
Running the Application
Create a .env
file with your MongoDB connection details:
MONGODB_URL=mongodb://localhost:27017
DATABASE_NAME=fastapi_students
Now, let's run our application:
uvicorn app.main:app --reload
Visit http://localhost:8000/docs
to see the Swagger UI documentation and interact with your API.
Testing Our API
Let's test our API using the interactive Swagger UI or with curl commands:
Create a new student
Request:
curl -X 'POST' \
'http://localhost:8000/students/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5
}'
Response:
{
"_id": "60d5ec9af682d67b8f3f57f4",
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5,
"created_at": "2023-10-25T14:30:22.123456",
"updated_at": "2023-10-25T14:30:22.123456"
}
Get all students
Request:
curl -X 'GET' 'http://localhost:8000/students/' -H 'accept: application/json'
Response:
[
{
"_id": "60d5ec9af682d67b8f3f57f4",
"name": "John Doe",
"email": "[email protected]",
"course": "Computer Science",
"gpa": 3.5,
"created_at": "2023-10-25T14:30:22.123456",
"updated_at": "2023-10-25T14:30:22.123456"
}
]
Real-World Applications
This simple student API can be extended for various real-world applications:
Pagination Implementation
For large datasets, implement proper pagination:
@app.get("/students/paginated/", response_model=dict)
async def paginated_students(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100)
):
skip = (page - 1) * page_size
# Get data for current page
students = await students_collection.find().skip(skip).limit(page_size).to_list(length=page_size)
# Get total count for pagination info
total = await students_collection.count_documents({})
return {
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size,
"data": students
}
Text Search Implementation
Enable powerful text search capabilities:
@app.get("/students/search/", response_model=List[StudentResponse])
async def search_students(q: str = Query(..., min_length=2)):
# Create text index if it doesn't exist
await students_collection.create_index([("name", "text"), ("course", "text")])
# Perform text search
results = await students_collection.find(
{"$text": {"$search": q}}
).to_list(length=100)
return results
Aggregation Pipeline Example
Let's calculate statistics about students per course:
from fastapi.responses import JSONResponse
@app.get("/statistics/courses/")
async def course_statistics():
pipeline = [
{"$group": {
"_id": "$course",
"student_count": {"$sum": 1},
"average_gpa": {"$avg": "$gpa"},
"max_gpa": {"$max": "$gpa"},
"min_gpa": {"$min": "$gpa"}
}},
{"$sort": {"student_count": -1}}
]
results = await students_collection.aggregate(pipeline).to_list(length=100)
return JSONResponse(content=results)
Best Practices for MongoDB with FastAPI
- Use Indexes: Create appropriate indexes on fields that you frequently query to improve performance.
# Create indexes during application startup
@app.on_event("startup")
async def create_indexes():
await students_collection.create_index("email", unique=True)
await students_collection.create_index([("name", 1), ("course", 1)])
# Text index for search
await students_collection.create_index([("name", "text"), ("course", "text")])
- Connection Management: Properly manage your MongoDB connections.
@app.on_event("shutdown")
async def shutdown_db_client():
client.close()
-
Data Validation: Always validate incoming data using Pydantic models.
-
Error Handling: Implement comprehensive error handling, especially for database operations.
-
Environment Variables: Never hardcode sensitive information like connection strings.
Summary
In this tutorial, we've covered:
- Setting up a FastAPI project with MongoDB integration using Motor
- Creating Pydantic models for data validation
- Implementing CRUD operations (Create, Read, Update, Delete)
- Adding real-world examples like pagination, text search, and aggregation
- Best practices for FastAPI and MongoDB integration
MongoDB's document model pairs wonderfully with FastAPI's JSON-based nature, creating a powerful combination for building modern web APIs. The asynchronous nature of both FastAPI and Motor allows your application to handle many concurrent requests efficiently.
Further Exercises
- Implement Authentication: Add JWT authentication to secure your API endpoints.
- Add Relationship Handling: Create endpoints that handle relationships between collections (e.g., students and courses).
- File Uploads: Implement a feature to upload student profile pictures using GridFS.
- Logging Middleware: Create middleware that logs all database operations for debugging.
- Caching Layer: Implement Redis caching to reduce database load for frequently accessed data.
Additional Resources
- FastAPI Official Documentation
- Motor Documentation
- MongoDB Documentation
- MongoDB University - Free courses on MongoDB
- Pydantic Documentation
Happy coding with FastAPI and MongoDB!
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)