Skip to main content

FastAPI Transaction Management

When building applications that interact with databases, ensuring data consistency and integrity is critical. Database transactions are a fundamental concept that allows you to group multiple operations together as a single unit of work. In this tutorial, we'll explore how to manage transactions effectively in FastAPI applications.

What are Database Transactions?

A database transaction is a sequence of operations performed as a single logical unit of work. Transactions follow the ACID properties:

  • Atomicity: All operations complete successfully, or none of them do
  • Consistency: The database moves from one valid state to another
  • Isolation: Concurrent transactions don't interfere with each other
  • Durability: Once committed, changes persist even in system failures

Setting Up the Database Connection

Before we dive into transactions, let's set up our database connection using SQLAlchemy. We'll use SQLAlchemy's ORM features with FastAPI.

First, install the required dependencies:

bash
pip install fastapi sqlalchemy uvicorn psycopg2-binary

Now, let's create our database connection:

python
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql://username:password@localhost/dbname"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency to get DB session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

Defining Models

Let's create sample models to demonstrate transactions:

python
# models.py
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship
from database import Base

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
name = Column(String, nullable=False)
email = Column(String, unique=True, nullable=False)
accounts = relationship("Account", back_populates="owner")

class Account(Base):
__tablename__ = "accounts"

id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"))
balance = Column(Float, default=0)
owner = relationship("User", back_populates="accounts")

Basic Transaction Handling in FastAPI

In FastAPI, transaction management typically happens within endpoint functions. The simplest approach uses the database session's built-in transaction features:

python
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from database import get_db
import models
import schemas # We'll define this later

app = FastAPI()

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# Check if email exists
db_user = db.query(models.User).filter(models.User.email == user.email).first()
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")

# Create new user
new_user = models.User(name=user.name, email=user.email)
db.add(new_user)

# Create a default account
account = models.Account(balance=0, owner=new_user)
db.add(account)

# Commit the transaction
db.commit()
db.refresh(new_user)
return new_user

In the example above, adding the user and creating an account happen within a single transaction. If any part fails, the entire operation is rolled back.

Explicit Transaction Management

For more control, you can explicitly manage transactions:

python
# services.py
from sqlalchemy.orm import Session
import models
import schemas

def transfer_money(
db: Session,
from_account_id: int,
to_account_id: int,
amount: float
):
if amount <= 0:
raise ValueError("Transfer amount must be positive")

# Get accounts
from_account = db.query(models.Account).filter(models.Account.id == from_account_id).first()
to_account = db.query(models.Account).filter(models.Account.id == to_account_id).first()

if not from_account or not to_account:
raise ValueError("One or both accounts not found")

if from_account.balance < amount:
raise ValueError("Insufficient funds")

# Start transaction explicitly
try:
# Update balances
from_account.balance -= amount
to_account.balance += amount

# Commit the transaction
db.commit()
return {"message": "Transfer successful"}
except Exception as e:
# Roll back in case of error
db.rollback()
raise e

And the corresponding API endpoint:

python
# main.py
@app.post("/transfer/")
def transfer_funds(
transfer: schemas.Transfer,
db: Session = Depends(get_db)
):
try:
result = transfer_money(
db,
transfer.from_account_id,
transfer.to_account_id,
transfer.amount
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Transfer failed")

Using Context Managers for Transactions

For more advanced cases, we can use context managers to handle transactions:

python
# transactions.py
from contextlib import contextmanager
from database import SessionLocal

@contextmanager
def get_transaction_session():
session = SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()

Using this context manager:

python
# services.py
from transactions import get_transaction_session

def process_payroll():
with get_transaction_session() as session:
# Get all employees
employees = session.query(models.Employee).all()

for employee in employees:
# Calculate salary
salary = calculate_salary(employee)

# Create payment record
payment = models.Payment(
employee_id=employee.id,
amount=salary,
status="processed"
)
session.add(payment)

# Update employee's last_paid date
employee.last_paid = datetime.now()

# If we get here, the transaction was committed
return {"message": "Payroll processed successfully"}

Handling Nested Transactions

Sometimes you may need to handle nested transactions. SQLAlchemy supports this through savepoints:

python
@app.post("/create-company/")
def create_company_with_employees(
company_data: schemas.CompanyCreate,
db: Session = Depends(get_db)
):
# Create company
company = models.Company(name=company_data.name)
db.add(company)

try:
# Create a savepoint
savepoint = db.begin_nested()

try:
# Create employees
for employee_data in company_data.employees:
employee = models.Employee(
name=employee_data.name,
email=employee_data.email,
company_id=company.id
)
db.add(employee)

# If employee creation succeeds, commit the savepoint
savepoint.commit()
except Exception:
# If employee creation fails, rollback to savepoint
# (company will still be created)
savepoint.rollback()

# Commit the main transaction
db.commit()
return {"message": "Company created successfully"}
except Exception as e:
# If company creation fails, rollback everything
db.rollback()
raise HTTPException(status_code=500, detail=str(e))

Best Practices for Transaction Management

  1. Keep transactions short: Long-running transactions can lock tables and impact performance
  2. Error handling: Always include proper error handling and rollback mechanisms
  3. Consistent state: Ensure the database remains in a consistent state regardless of success or failure
  4. Avoid nested transactions when possible, as they can be complex to reason about
  5. Log transaction failures to help with debugging

Real-world Example: E-commerce Order Processing

Let's implement a realistic e-commerce order processing system with proper transaction management:

python
# schemas.py
from pydantic import BaseModel
from typing import List, Optional

class OrderItemCreate(BaseModel):
product_id: int
quantity: int

class OrderCreate(BaseModel):
user_id: int
items: List[OrderItemCreate]
shipping_address_id: int

# services.py
def create_order(db: Session, order: OrderCreate):
# Start a transaction
try:
# Create order record
db_order = models.Order(
user_id=order.user_id,
status="pending",
shipping_address_id=order.shipping_address_id
)
db.add(db_order)
db.flush() # Flush to get the order ID without committing

total_amount = 0

# Process each order item
for item in order.items:
# Get product
product = db.query(models.Product).filter(models.Product.id == item.product_id).with_for_update().first()
if not product:
raise ValueError(f"Product with ID {item.product_id} not found")

# Check inventory
if product.stock < item.quantity:
raise ValueError(f"Insufficient stock for product {product.name}")

# Update inventory
product.stock -= item.quantity

# Calculate item price
item_price = product.price * item.quantity
total_amount += item_price

# Create order item
order_item = models.OrderItem(
order_id=db_order.id,
product_id=product.id,
quantity=item.quantity,
price=product.price
)
db.add(order_item)

# Update order with total amount
db_order.total_amount = total_amount

# Commit the transaction
db.commit()
return db_order

except Exception as e:
# Roll back in case of error
db.rollback()
raise e

And the corresponding API endpoint:

python
# main.py
@app.post("/orders/", response_model=schemas.Order)
def place_order(order: schemas.OrderCreate, db: Session = Depends(get_db)):
try:
return create_order(db, order)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail="Failed to create order")

Summary

Transaction management is a critical aspect of database operations in FastAPI applications. By properly implementing transactions, you ensure data integrity and consistency even when operations fail or errors occur.

We've covered:

  • Basic concepts of database transactions and ACID properties
  • Setting up database connections in FastAPI
  • Basic transaction handling using SQLAlchemy sessions
  • Explicit transaction management with commit and rollback
  • Using context managers for cleaner transaction code
  • Handling nested transactions with savepoints
  • Best practices for transaction management
  • Real-world examples of transaction usage

Further Resources and Exercises

Additional Resources

Exercises

  1. Exercise: Implement a banking system with account creation, deposit, withdrawal, and transfer operations using proper transaction management.

  2. Exercise: Create an inventory management system where multiple users can reserve items concurrently while maintaining consistent inventory counts.

  3. Exercise: Implement an order cancellation endpoint that reverses all the effects of order creation (restores inventory, refunds payment, etc.) within a transaction.

  4. Challenge: Implement optimistic concurrency control for products that might be updated by multiple users simultaneously.

By mastering transaction management, you'll build more robust and reliable FastAPI applications that correctly handle database operations even in complex scenarios.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)