Skip to main content

FastAPI Database Queries

Introduction

In web applications, interacting with databases is a fundamental requirement. FastAPI provides excellent integration with databases, primarily through SQLAlchemy, an SQL toolkit and Object-Relational Mapping (ORM) library for Python. This guide will walk you through performing various database queries in your FastAPI applications.

By the end of this tutorial, you'll understand how to:

  • Set up database connections in FastAPI
  • Perform CRUD operations (Create, Read, Update, Delete)
  • Implement filtering, sorting, and pagination
  • Handle database relationships
  • Optimize your database queries

Prerequisites

Before we dive in, ensure you have:

  • Basic knowledge of FastAPI
  • Python 3.7+ installed
  • Understanding of SQL fundamentals
  • Familiarity with async programming concepts

Setting Up the Database Connection

First, let's set up a database connection using SQLAlchemy with FastAPI:

python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session

# Database URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# For PostgreSQL: "postgresql://user:password@localhost/dbname"

# Create SQLAlchemy engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)

# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# Create Base class
Base = declarative_base()

app = FastAPI()

# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Defining Database Models

Let's create some models to work with:

python
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)

items = relationship("Item", back_populates="owner")

class Item(Base):
__tablename__ = "items"

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))

owner = relationship("User", back_populates="items")

Now, let's create the tables in the database:

python
# Create tables in the database
Base.metadata.create_all(bind=engine)

Basic CRUD Operations

Create Operation

Let's start with creating a new user:

python
from pydantic import BaseModel

# Pydantic model for request data validation
class UserCreate(BaseModel):
email: str
password: str

# Create a new user
@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# Check if user already exists
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")

# Create a new User instance
db_user = User(email=user.email, hashed_password=user.password) # In real apps, hash the password!

# Add to session and commit
db.add(db_user)
db.commit()
db.refresh(db_user) # Refresh to get generated id

return db_user

Input:

json
{
"email": "[email protected]",
"password": "secretpassword"
}

Output:

json
{
"id": 1,
"email": "[email protected]",
"is_active": true
}

Read Operations

Get a single user by ID

python
# Pydantic model for response
class User(BaseModel):
id: int
email: str
is_active: bool

class Config:
orm_mode = True

@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

Get all users

python
from typing import List

@app.get("/users/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users

Output:

json
[
{
"id": 1,
"email": "[email protected]",
"is_active": true
},
{
"id": 2,
"email": "[email protected]",
"is_active": true
}
]

Update Operation

python
class UserUpdate(BaseModel):
email: str = None
is_active: bool = None

@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

# Update user attributes
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)

db.commit()
db.refresh(db_user)
return db_user

Input:

json
{
"email": "[email protected]",
"is_active": false
}

Output:

json
{
"id": 1,
"email": "[email protected]",
"is_active": false
}

Delete Operation

python
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

db.delete(db_user)
db.commit()
return {"detail": "User deleted successfully"}

Output:

json
{
"detail": "User deleted successfully"
}

Advanced Queries

Filtering

Let's implement a more advanced user search with multiple filters:

python
@app.get("/users/search/", response_model=List[User])
def search_users(
email: str = None,
is_active: bool = None,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
query = db.query(User)

# Apply filters if provided
if email:
query = query.filter(User.email.contains(email))
if is_active is not None:
query = query.filter(User.is_active == is_active)

users = query.offset(skip).limit(limit).all()
return users

Sorting

Add sorting capabilities to the user listing endpoint:

python
from sqlalchemy import desc

@app.get("/users/sorted/", response_model=List[User])
def sorted_users(
sort_by: str = "id",
descending: bool = False,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
# Validate sort_by field
if sort_by not in ["id", "email", "is_active"]:
sort_by = "id" # Default sort field

# Get sort column
sort_column = getattr(User, sort_by)

# Apply sorting direction
if descending:
sort_column = desc(sort_column)

users = db.query(User).order_by(sort_column).offset(skip).limit(limit).all()
return users

Pagination

We can enhance our pagination with more information about total counts:

python
from math import ceil

class PaginatedResponse(BaseModel):
items: List[User]
total: int
page: int
pages: int
size: int

@app.get("/users/paginated/", response_model=PaginatedResponse)
def paginated_users(
page: int = 1,
size: int = 10,
db: Session = Depends(get_db)
):
# Ensure valid pagination parameters
if page < 1:
page = 1
if size < 1:
size = 10

# Calculate offset
skip = (page - 1) * size

# Execute query
users = db.query(User).offset(skip).limit(size).all()

# Get total count for pagination info
total = db.query(User).count()
pages = ceil(total / size)

return {
"items": users,
"total": total,
"page": page,
"pages": pages,
"size": size
}

Working with Relationships

Let's create an endpoint to get items for a specific user:

python
class Item(BaseModel):
id: int
title: str
description: str

class Config:
orm_mode = True

@app.get("/users/{user_id}/items", response_model=List[Item])
def read_user_items(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

return db_user.items

And an endpoint to create an item for a user:

python
class ItemCreate(BaseModel):
title: str
description: str = None

@app.post("/users/{user_id}/items", response_model=Item)
def create_item_for_user(
user_id: int,
item: ItemCreate,
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

db_item = Item(title=item.title, description=item.description, owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)

return db_item

Query Optimization

Using Joins

When you need data from related tables, using joins can be more efficient:

python
class UserWithItems(User):
items: List[Item]

@app.get("/users-with-items/{user_id}", response_model=UserWithItems)
def get_user_with_items(user_id: int, db: Session = Depends(get_db)):
# Using join to load related data in one query
db_user = db.query(User).options(
joinedload(User.items)
).filter(User.id == user_id).first()

if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

return db_user

Using Specific Column Selection

Sometimes you don't need all columns from a table:

python
from sqlalchemy.orm import load_only

@app.get("/users-email-only/")
def get_users_email_only(db: Session = Depends(get_db)):
# Only select email column
users = db.query(User).options(load_only(User.email)).all()
return [{"email": user.email} for user in users]

Async Database Operations

For high-performance applications, you can use async database operations with FastAPI:

python
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio

# Async database URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

# Create async engine
async_engine = create_async_engine(ASYNC_DATABASE_URL)

# Create async session
AsyncSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
class_=AsyncSession,
)

async def get_async_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()

@app.get("/async-users/")
async def get_users_async(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return users

Real-World Example: Blog API

Let's create a more complete example of a blog API with users, posts, and comments:

python
# Models
class BlogUser(Base):
__tablename__ = "blog_users"

id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_admin = Column(Boolean, default=False)

posts = relationship("BlogPost", back_populates="author")
comments = relationship("Comment", back_populates="author")

class BlogPost(Base):
__tablename__ = "blog_posts"

id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(String)
published = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey("blog_users.id"))

author = relationship("BlogUser", back_populates="posts")
comments = relationship("Comment", back_populates="post")

class Comment(Base):
__tablename__ = "comments"

id = Column(Integer, primary_key=True, index=True)
text = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
post_id = Column(Integer, ForeignKey("blog_posts.id"))
author_id = Column(Integer, ForeignKey("blog_users.id"))

post = relationship("BlogPost", back_populates="comments")
author = relationship("BlogUser", back_populates="comments")

# Pydantic models
class CommentBase(BaseModel):
text: str

class CommentCreate(CommentBase):
pass

class Comment(CommentBase):
id: int
post_id: int
author_id: int
created_at: datetime

class Config:
orm_mode = True

class PostBase(BaseModel):
title: str
content: str
published: bool = True

class PostCreate(PostBase):
pass

class Post(PostBase):
id: int
author_id: int
created_at: datetime
comments: List[Comment] = []

class Config:
orm_mode = True

# API endpoints
@app.post("/blog/posts/", response_model=Post)
def create_post(post: PostCreate, user_id: int, db: Session = Depends(get_db)):
db_user = db.query(BlogUser).filter(BlogUser.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")

db_post = BlogPost(**post.dict(), author_id=user_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post

@app.get("/blog/posts/", response_model=List[Post])
def read_posts(
skip: int = 0,
limit: int = 10,
published_only: bool = True,
db: Session = Depends(get_db)
):
query = db.query(BlogPost)

if published_only:
query = query.filter(BlogPost.published == True)

posts = query.order_by(BlogPost.created_at.desc()).offset(skip).limit(limit).all()
return posts

@app.get("/blog/posts/{post_id}", response_model=Post)
def read_post(post_id: int, db: Session = Depends(get_db)):
post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
return post

@app.post("/blog/posts/{post_id}/comments/", response_model=Comment)
def create_comment(
post_id: int,
comment: CommentCreate,
user_id: int,
db: Session = Depends(get_db)
):
post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")

db_comment = Comment(**comment.dict(), post_id=post_id, author_id=user_id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment

@app.get("/blog/search/", response_model=List[Post])
def search_posts(
query: str,
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db)
):
posts = db.query(BlogPost).filter(
or_(
BlogPost.title.contains(query),
BlogPost.content.contains(query)
)
).offset(skip).limit(limit).all()

return posts

Summary

In this comprehensive guide, we've explored how to perform database queries in FastAPI applications using SQLAlchemy. We've covered:

  1. Setting up database connections
  2. Creating database models
  3. Performing CRUD operations:
    • Creating records
    • Reading records (single and multiple)
    • Updating records
    • Deleting records
  4. Implementing advanced queries with filtering, sorting, and pagination
  5. Working with relationships between models
  6. Optimizing queries
  7. Using async database operations
  8. Building a real-world blog API example

Database operations are fundamental to most web applications, and FastAPI's integration with SQLAlchemy provides a powerful and flexible way to work with databases. The combination of FastAPI's performance, SQLAlchemy's robust ORM capabilities, and Pydantic's data validation creates a solid foundation for building database-driven applications.

Additional Resources

To continue learning about FastAPI and databases:

  1. FastAPI Official Documentation on SQL Databases
  2. SQLAlchemy Documentation
  3. Alembic for Database Migrations
  4. Database Design Fundamentals

Exercises

  1. Basic Exercise: Create a simple todo application with FastAPI and SQLAlchemy that allows users to create, read, update, and delete tasks.

  2. Intermediate Exercise: Extend the blog API example to include functionality for categories and tags for blog posts.

  3. Advanced Exercise: Implement a complete user authentication system using FastAPI, SQLAlchemy, and JWT tokens, with role-based access control for different API endpoints.

With these exercises, you'll gain hands-on experience with FastAPI database operations and deepen your understanding of how to build robust database-driven applications.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)