FastAPI Database Connections
Database connections form the backbone of any data-driven web application. In this tutorial, we'll learn how to properly establish, manage, and optimize database connections in FastAPI applications.
Introduction
Most modern web applications need to interact with databases to store and retrieve data. FastAPI is no exception and provides several ways to work with databases efficiently. However, managing database connections properly is crucial for application performance and stability.
When building FastAPI applications that connect to databases, you'll need to consider:
- How to establish connections
- How to manage connection pools
- How to handle transactions
- How to close connections properly
In this guide, we'll explore the most common approaches using popular Python database libraries.
Setting Up Database Dependencies in FastAPI
FastAPI leverages dependency injection to manage resources like database connections. This pattern helps ensure that connections are properly acquired and released for each request.
Basic Database Connection Dependency
Let's start with a simple example using SQLAlchemy:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
# Database URL
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
# Create engine
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# Create SessionLocal class
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create Base class
Base = declarative_base()
# Dependency for database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
In this example:
- We create a database engine using SQLAlchemy
- We define a
SessionLocal
class to create database sessions - We create a dependency function
get_db()
that yields a database session - The session is automatically closed after the request is processed
The try/finally
pattern ensures that the database connection is properly closed even if there's an exception during the request.
Connection Pooling
Connection pooling is a technique that maintains a pool of open database connections that can be reused. This eliminates the overhead of establishing a new connection for each request.
SQLAlchemy implements connection pooling by default. Here's how to configure it:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://user:password@localhost/dbname",
poolclass=QueuePool,
pool_size=5, # Maximum number of connections in the pool
max_overflow=10, # Maximum number of connections that can be created beyond the pool_size
pool_timeout=30, # Seconds to wait before giving up on getting a connection from the pool
pool_recycle=1800, # Seconds after which a connection is recycled
)
Key Pooling Parameters:
pool_size
: Number of connections to keep openmax_overflow
: Extra connections allowed when pool_size is reachedpool_timeout
: How long to wait for a connection when the pool is fullpool_recycle
: Maximum age of connections to prevent stale connections
Using the databases
Library
The databases
library provides asynchronous database support for FastAPI, which can improve performance for I/O-bound applications. Here's how to use it:
from fastapi import FastAPI
from databases import Database
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, select
# Database URL for both SQLAlchemy and the database library
DATABASE_URL = "sqlite:///./test.db"
# SQLAlchemy setup for table definitions
metadata = MetaData()
users = Table(
"users",
metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
Column("email", String),
)
# Create tables
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)
# Database instance
database = Database(DATABASE_URL)
app = FastAPI()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/users/{user_id}")
async def read_user(user_id: int):
query = select([users]).where(users.c.id == user_id)
user = await database.fetch_one(query)
if not user:
return {"error": "User not found"}
return user
This approach uses FastAPI's event handlers to connect to the database when the application starts and disconnect when it shuts down.
Handling Multiple Databases
For applications that need to connect to multiple databases, you can create separate dependencies:
from fastapi import Depends, FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
# Database URLs
USERS_DB_URL = "sqlite:///./users.db"
PRODUCTS_DB_URL = "sqlite:///./products.db"
# Create engines
users_engine = create_engine(USERS_DB_URL, connect_args={"check_same_thread": False})
products_engine = create_engine(PRODUCTS_DB_URL, connect_args={"check_same_thread": False})
# Create SessionLocal classes
UsersSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=users_engine)
ProductsSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=products_engine)
# Dependencies
def get_users_db():
db = UsersSessionLocal()
try:
yield db
finally:
db.close()
def get_products_db():
db = ProductsSessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_users_db)):
# Query the users database
pass
@app.get("/products/{product_id}")
def read_product(product_id: int, db: Session = Depends(get_products_db)):
# Query the products database
pass
Best Practices for Database Connections in FastAPI
- Use dependency injection: Let FastAPI manage your connections through dependencies
- Implement connection pooling: Avoid creating new connections for each request
- Close connections properly: Always release connections when done
- Use try-finally blocks: Ensure connections are closed even when exceptions occur
- Consider async for I/O-bound applications: Use async databases for better performance
- Set reasonable pool sizes: Match your connection pool to your application's needs
- Monitor connection usage: Watch for connection leaks and pool exhaustion
Handling Transactions
Transactions ensure that a series of database operations are treated as a single unit:
from fastapi import Depends, FastAPI
from sqlalchemy.orm import Session
# ... (database setup code) ...
@app.post("/transfer")
def transfer_funds(
from_account: int,
to_account: int,
amount: float,
db: Session = Depends(get_db)
):
try:
# Start transaction implicitly
# Deduct from source account
from_acc = db.query(Account).filter(Account.id == from_account).first()
if not from_acc or from_acc.balance < amount:
db.rollback()
return {"error": "Insufficient funds or invalid account"}
from_acc.balance -= amount
# Add to target account
to_acc = db.query(Account).filter(Account.id == to_account).first()
if not to_acc:
db.rollback()
return {"error": "Target account not found"}
to_acc.balance += amount
# Commit the transaction
db.commit()
return {"message": "Transfer successful"}
except Exception as e:
db.rollback()
return {"error": str(e)}
Real-World Example: Complete Blog API with Database Connection
Let's build a simple blog API with proper database connection handling:
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy import Column, Integer, String, Text, create_engine, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
# Database setup
DATABASE_URL = "sqlite:///./blog.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Models
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(Text)
author_id = Column(Integer, ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
# Create tables
Base.metadata.create_all(bind=engine)
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
# User endpoints
@app.post("/users/", response_model=dict)
def create_user(username: str, email: str, db: Session = Depends(get_db)):
user = User(username=username, email=email)
db.add(user)
db.commit()
db.refresh(user)
return {"id": user.id, "username": user.username, "email": user.email}
@app.get("/users/{user_id}", response_model=dict)
def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user.id, "username": user.username, "email": user.email}
# Post endpoints
@app.post("/posts/", response_model=dict)
def create_post(title: str, content: str, author_id: int, db: Session = Depends(get_db)):
# Verify user exists
user = db.query(User).filter(User.id == author_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
post = Post(title=title, content=content, author_id=author_id)
db.add(post)
db.commit()
db.refresh(post)
return {"id": post.id, "title": post.title, "author_id": post.author_id}
@app.get("/posts/{post_id}", response_model=dict)
def read_post(post_id: int, db: Session = Depends(get_db)):
post = db.query(Post).filter(Post.id == post_id).first()
if post is None:
raise HTTPException(status_code=404, detail="Post not found")
return {
"id": post.id,
"title": post.title,
"content": post.content,
"author": {
"id": post.author.id,
"username": post.author.username
}
}
When you run this application:
- The database connection is established through the engine
- Tables are created if they don't exist
- For each endpoint, a database session is created using the dependency
- After each request, the database session is automatically closed
Summary
In this tutorial, we've covered:
- How to establish database connections in FastAPI
- How to create and use dependency injection for database sessions
- Connection pooling for better performance
- Asynchronous database connections with the
databases
library - Managing multiple database connections
- Handling transactions properly
- A real-world example with a blog API
Database connections are a critical part of any web application, and FastAPI provides flexible tools to manage them efficiently. By following the patterns shown in this guide, you can build robust, performant applications that handle database resources properly.
Additional Resources
- FastAPI SQL Database Tutorial
- SQLAlchemy Documentation
- Databases Library
- SQLAlchemy Connection Pooling
Exercises
- Create a FastAPI application that connects to a PostgreSQL database
- Implement a RESTful API with CRUD operations for a resource of your choice
- Add support for multiple databases in a single application
- Implement a transaction that involves multiple database operations
- Configure connection pooling with optimal settings for your application
Database connections can be complex, but with the tools and patterns provided by FastAPI, you can manage them efficiently and build robust applications.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)