FastAPI Database Queries
Introduction
In web applications, interacting with databases is a fundamental requirement. FastAPI provides excellent integration with databases, primarily through SQLAlchemy, an SQL toolkit and Object-Relational Mapping (ORM) library for Python. This guide will walk you through performing various database queries in your FastAPI applications.
By the end of this tutorial, you'll understand how to:
- Set up database connections in FastAPI
- Perform CRUD operations (Create, Read, Update, Delete)
- Implement filtering, sorting, and pagination
- Handle database relationships
- Optimize your database queries
Prerequisites
Before we dive in, ensure you have:
- Basic knowledge of FastAPI
- Python 3.7+ installed
- Understanding of SQL fundamentals
- Familiarity with async programming concepts
Setting Up the Database Connection
First, let's set up a database connection using SQLAlchemy with FastAPI:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
# Database URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# For PostgreSQL: "postgresql://user:password@localhost/dbname"
# Create SQLAlchemy engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()
app = FastAPI()
# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Defining Database Models
Let's create some models to work with:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
description = Column(String, index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
Now, let's create the tables in the database:
# Create tables in the database
Base.metadata.create_all(bind=engine)
Basic CRUD Operations
Create Operation
Let's start with creating a new user:
from pydantic import BaseModel
# Pydantic model for request data validation
class UserCreate(BaseModel):
email: str
password: str
# Create a new user
@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
# Check if user already exists
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Create a new User instance
db_user = User(email=user.email, hashed_password=user.password) # In real apps, hash the password!
# Add to session and commit
db.add(db_user)
db.commit()
db.refresh(db_user) # Refresh to get generated id
return db_user
Input:
{
"email": "[email protected]",
"password": "secretpassword"
}
Output:
{
"id": 1,
"email": "[email protected]",
"is_active": true
}
Read Operations
Get a single user by ID
# Pydantic model for response
class User(BaseModel):
id: int
email: str
is_active: bool
class Config:
orm_mode = True
@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
Get all users
from typing import List
@app.get("/users/", response_model=List[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = db.query(User).offset(skip).limit(limit).all()
return users
Output:
[
{
"id": 1,
"email": "[email protected]",
"is_active": true
},
{
"id": 2,
"email": "[email protected]",
"is_active": true
}
]
Update Operation
class UserUpdate(BaseModel):
email: str = None
is_active: bool = None
@app.put("/users/{user_id}", response_model=User)
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
# Update user attributes
for key, value in user.dict(exclude_unset=True).items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
Input:
{
"email": "[email protected]",
"is_active": false
}
Output:
{
"id": 1,
"email": "[email protected]",
"is_active": false
}
Delete Operation
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db.delete(db_user)
db.commit()
return {"detail": "User deleted successfully"}
Output:
{
"detail": "User deleted successfully"
}
Advanced Queries
Filtering
Let's implement a more advanced user search with multiple filters:
@app.get("/users/search/", response_model=List[User])
def search_users(
email: str = None,
is_active: bool = None,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
query = db.query(User)
# Apply filters if provided
if email:
query = query.filter(User.email.contains(email))
if is_active is not None:
query = query.filter(User.is_active == is_active)
users = query.offset(skip).limit(limit).all()
return users
Sorting
Add sorting capabilities to the user listing endpoint:
from sqlalchemy import desc
@app.get("/users/sorted/", response_model=List[User])
def sorted_users(
sort_by: str = "id",
descending: bool = False,
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db)
):
# Validate sort_by field
if sort_by not in ["id", "email", "is_active"]:
sort_by = "id" # Default sort field
# Get sort column
sort_column = getattr(User, sort_by)
# Apply sorting direction
if descending:
sort_column = desc(sort_column)
users = db.query(User).order_by(sort_column).offset(skip).limit(limit).all()
return users
Pagination
We can enhance our pagination with more information about total counts:
from math import ceil
class PaginatedResponse(BaseModel):
items: List[User]
total: int
page: int
pages: int
size: int
@app.get("/users/paginated/", response_model=PaginatedResponse)
def paginated_users(
page: int = 1,
size: int = 10,
db: Session = Depends(get_db)
):
# Ensure valid pagination parameters
if page < 1:
page = 1
if size < 1:
size = 10
# Calculate offset
skip = (page - 1) * size
# Execute query
users = db.query(User).offset(skip).limit(size).all()
# Get total count for pagination info
total = db.query(User).count()
pages = ceil(total / size)
return {
"items": users,
"total": total,
"page": page,
"pages": pages,
"size": size
}
Working with Relationships
Let's create an endpoint to get items for a specific user:
class Item(BaseModel):
id: int
title: str
description: str
class Config:
orm_mode = True
@app.get("/users/{user_id}/items", response_model=List[Item])
def read_user_items(user_id: int, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user.items
And an endpoint to create an item for a user:
class ItemCreate(BaseModel):
title: str
description: str = None
@app.post("/users/{user_id}/items", response_model=Item)
def create_item_for_user(
user_id: int,
item: ItemCreate,
db: Session = Depends(get_db)
):
db_user = db.query(User).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
db_item = Item(title=item.title, description=item.description, owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
Query Optimization
Using Joins
When you need data from related tables, using joins can be more efficient:
class UserWithItems(User):
items: List[Item]
@app.get("/users-with-items/{user_id}", response_model=UserWithItems)
def get_user_with_items(user_id: int, db: Session = Depends(get_db)):
# Using join to load related data in one query
db_user = db.query(User).options(
joinedload(User.items)
).filter(User.id == user_id).first()
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
Using Specific Column Selection
Sometimes you don't need all columns from a table:
from sqlalchemy.orm import load_only
@app.get("/users-email-only/")
def get_users_email_only(db: Session = Depends(get_db)):
# Only select email column
users = db.query(User).options(load_only(User.email)).all()
return [{"email": user.email} for user in users]
Async Database Operations
For high-performance applications, you can use async database operations with FastAPI:
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
import asyncio
# Async database URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# Create async engine
async_engine = create_async_engine(ASYNC_DATABASE_URL)
# Create async session
AsyncSessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=async_engine,
class_=AsyncSession,
)
async def get_async_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@app.get("/async-users/")
async def get_users_async(db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return users
Real-World Example: Blog API
Let's create a more complete example of a blog API with users, posts, and comments:
# Models
class BlogUser(Base):
__tablename__ = "blog_users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_admin = Column(Boolean, default=False)
posts = relationship("BlogPost", back_populates="author")
comments = relationship("Comment", back_populates="author")
class BlogPost(Base):
__tablename__ = "blog_posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(String)
published = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
author_id = Column(Integer, ForeignKey("blog_users.id"))
author = relationship("BlogUser", back_populates="posts")
comments = relationship("Comment", back_populates="post")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
text = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
post_id = Column(Integer, ForeignKey("blog_posts.id"))
author_id = Column(Integer, ForeignKey("blog_users.id"))
post = relationship("BlogPost", back_populates="comments")
author = relationship("BlogUser", back_populates="comments")
# Pydantic models
class CommentBase(BaseModel):
text: str
class CommentCreate(CommentBase):
pass
class Comment(CommentBase):
id: int
post_id: int
author_id: int
created_at: datetime
class Config:
orm_mode = True
class PostBase(BaseModel):
title: str
content: str
published: bool = True
class PostCreate(PostBase):
pass
class Post(PostBase):
id: int
author_id: int
created_at: datetime
comments: List[Comment] = []
class Config:
orm_mode = True
# API endpoints
@app.post("/blog/posts/", response_model=Post)
def create_post(post: PostCreate, user_id: int, db: Session = Depends(get_db)):
db_user = db.query(BlogUser).filter(BlogUser.id == user_id).first()
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
db_post = BlogPost(**post.dict(), author_id=user_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
@app.get("/blog/posts/", response_model=List[Post])
def read_posts(
skip: int = 0,
limit: int = 10,
published_only: bool = True,
db: Session = Depends(get_db)
):
query = db.query(BlogPost)
if published_only:
query = query.filter(BlogPost.published == True)
posts = query.order_by(BlogPost.created_at.desc()).offset(skip).limit(limit).all()
return posts
@app.get("/blog/posts/{post_id}", response_model=Post)
def read_post(post_id: int, db: Session = Depends(get_db)):
post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
return post
@app.post("/blog/posts/{post_id}/comments/", response_model=Comment)
def create_comment(
post_id: int,
comment: CommentCreate,
user_id: int,
db: Session = Depends(get_db)
):
post = db.query(BlogPost).filter(BlogPost.id == post_id).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
db_comment = Comment(**comment.dict(), post_id=post_id, author_id=user_id)
db.add(db_comment)
db.commit()
db.refresh(db_comment)
return db_comment
@app.get("/blog/search/", response_model=List[Post])
def search_posts(
query: str,
skip: int = 0,
limit: int = 10,
db: Session = Depends(get_db)
):
posts = db.query(BlogPost).filter(
or_(
BlogPost.title.contains(query),
BlogPost.content.contains(query)
)
).offset(skip).limit(limit).all()
return posts
Summary
In this comprehensive guide, we've explored how to perform database queries in FastAPI applications using SQLAlchemy. We've covered:
- Setting up database connections
- Creating database models
- Performing CRUD operations:
- Creating records
- Reading records (single and multiple)
- Updating records
- Deleting records
- Implementing advanced queries with filtering, sorting, and pagination
- Working with relationships between models
- Optimizing queries
- Using async database operations
- Building a real-world blog API example
Database operations are fundamental to most web applications, and FastAPI's integration with SQLAlchemy provides a powerful and flexible way to work with databases. The combination of FastAPI's performance, SQLAlchemy's robust ORM capabilities, and Pydantic's data validation creates a solid foundation for building database-driven applications.
Additional Resources
To continue learning about FastAPI and databases:
- FastAPI Official Documentation on SQL Databases
- SQLAlchemy Documentation
- Alembic for Database Migrations
- Database Design Fundamentals
Exercises
-
Basic Exercise: Create a simple todo application with FastAPI and SQLAlchemy that allows users to create, read, update, and delete tasks.
-
Intermediate Exercise: Extend the blog API example to include functionality for categories and tags for blog posts.
-
Advanced Exercise: Implement a complete user authentication system using FastAPI, SQLAlchemy, and JWT tokens, with role-based access control for different API endpoints.
With these exercises, you'll gain hands-on experience with FastAPI database operations and deepen your understanding of how to build robust database-driven applications.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)