Skip to main content

FastAPI Async Database

Introduction

One of FastAPI's most powerful features is its first-class support for asynchronous programming. When working with databases, asynchronous operations can significantly improve your application's performance by preventing I/O operations from blocking your web server. This is especially important for database operations that might take time to complete.

In this tutorial, we'll explore how to set up and use asynchronous database connections with FastAPI. We'll use the databases library along with SQLAlchemy Core to create a fully asynchronous database-backed API.

Prerequisites

Before we begin, make sure you have:

  • Python 3.7+
  • Basic understanding of FastAPI
  • Basic knowledge of SQL and databases
  • Familiarity with async/await in Python

Understanding Async Database Operations

Why Use Async for Databases?

Database operations are I/O-bound, meaning they spend most of their time waiting for the database to respond. With traditional synchronous code, your application would block while waiting for these operations to complete, preventing it from handling other requests.

With async database operations:

  1. Your application can handle multiple requests concurrently
  2. Server resources are used more efficiently
  3. Your API can maintain high throughput even during database-heavy operations

Setting Up Async Database with FastAPI

Step 1: Install Required Packages

First, let's install the necessary packages:

bash
pip install fastapi uvicorn databases sqlalchemy asyncpg
  • fastapi: The web framework
  • uvicorn: ASGI server for running FastAPI
  • databases: Async database support
  • sqlalchemy: For database models and queries
  • asyncpg: Async PostgreSQL driver (use aiosqlite for SQLite)

Step 2: Create a Database Connection

Let's set up a PostgreSQL database connection:

python
from databases import Database
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
import datetime
from fastapi import FastAPI

# Database URL
DATABASE_URL = "postgresql://user:password@localhost/dbname"

# SQLAlchemy
metadata = MetaData()

# Define tables
notes = Table(
"notes",
metadata,
Column("id", Integer, primary_key=True),
Column("title", String(50)),
Column("description", String(200)),
Column("created_date", DateTime, default=datetime.datetime.utcnow),
)

# Create tables
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

# Database instance
database = Database(DATABASE_URL)

# FastAPI instance
app = FastAPI()

# Events
@app.on_event("startup")
async def startup():
await database.connect()

@app.on_event("shutdown")
async def shutdown():
await database.disconnect()

This code sets up:

  • A SQLAlchemy table definition
  • A database engine to create tables
  • An async database connection using the databases library
  • Events to connect and disconnect from the database

Step 3: Create Pydantic Models

Let's define Pydantic models for our data validation:

python
from pydantic import BaseModel
from typing import Optional
from datetime import datetime

class NoteIn(BaseModel):
title: str
description: str

class Note(BaseModel):
id: int
title: str
description: str
created_date: datetime

Step 4: Implement CRUD Operations

Now, let's implement asynchronous CRUD operations:

python
@app.post("/notes/", response_model=Note)
async def create_note(note: NoteIn):
query = notes.insert().values(title=note.title, description=note.description)
last_record_id = await database.execute(query)
return {**note.dict(), "id": last_record_id, "created_date": datetime.datetime.utcnow()}

@app.get("/notes/", response_model=list[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)

@app.get("/notes/{note_id}", response_model=Note)
async def read_note(note_id: int):
query = notes.select().where(notes.c.id == note_id)
return await database.fetch_one(query)

@app.put("/notes/{note_id}", response_model=Note)
async def update_note(note_id: int, note: NoteIn):
query = notes.update().where(notes.c.id == note_id).values(title=note.title, description=note.description)
await database.execute(query)
return {**note.dict(), "id": note_id, "created_date": datetime.datetime.utcnow()}

@app.delete("/notes/{note_id}")
async def delete_note(note_id: int):
query = notes.delete().where(notes.c.id == note_id)
await database.execute(query)
return {"message": "Note deleted successfully"}

Each of these endpoint handlers is defined with async and uses await when calling database operations, ensuring our application remains non-blocking.

Complete Example: Building an Async Note-Taking API

Let's put everything together into a complete FastAPI application:

python
import datetime
from typing import List
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from databases import Database
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime

# Database URL - replace with your actual credentials
DATABASE_URL = "postgresql://user:password@localhost/notesdb"

# SQLAlchemy setup
metadata = MetaData()
notes = Table(
"notes",
metadata,
Column("id", Integer, primary_key=True),
Column("title", String(50)),
Column("description", String(200)),
Column("created_date", DateTime, default=datetime.datetime.utcnow),
)

# Create tables
engine = create_engine(DATABASE_URL)
metadata.create_all(engine)

# Database
database = Database(DATABASE_URL)

# FastAPI app
app = FastAPI(title="Async Notes API")

# Models
class NoteIn(BaseModel):
title: str
description: str

class Note(BaseModel):
id: int
title: str
description: str
created_date: datetime.datetime

# Events
@app.on_event("startup")
async def startup():
await database.connect()

@app.on_event("shutdown")
async def shutdown():
await database.disconnect()

# Endpoints
@app.post("/notes/", response_model=Note, status_code=201)
async def create_note(note: NoteIn):
query = notes.insert().values(
title=note.title,
description=note.description,
created_date=datetime.datetime.utcnow()
)
last_record_id = await database.execute(query)

# Fetch the created note to return
created_note = await database.fetch_one(
notes.select().where(notes.c.id == last_record_id)
)
return created_note

@app.get("/notes/", response_model=List[Note])
async def read_notes():
query = notes.select()
return await database.fetch_all(query)

@app.get("/notes/{note_id}", response_model=Note)
async def read_note(note_id: int):
query = notes.select().where(notes.c.id == note_id)
note = await database.fetch_one(query)

if note is None:
raise HTTPException(status_code=404, detail="Note not found")

return note

@app.put("/notes/{note_id}", response_model=Note)
async def update_note(note_id: int, note: NoteIn):
# Check if the note exists
query = notes.select().where(notes.c.id == note_id)
existing_note = await database.fetch_one(query)

if existing_note is None:
raise HTTPException(status_code=404, detail="Note not found")

# Update the note
update_query = notes.update().where(notes.c.id == note_id).values(
title=note.title,
description=note.description
)
await database.execute(update_query)

# Return the updated note
updated_note = await database.fetch_one(query)
return updated_note

@app.delete("/notes/{note_id}", status_code=204)
async def delete_note(note_id: int):
# Check if the note exists
query = notes.select().where(notes.c.id == note_id)
note = await database.fetch_one(query)

if note is None:
raise HTTPException(status_code=404, detail="Note not found")

# Delete the note
delete_query = notes.delete().where(notes.c.id == note_id)
await database.execute(delete_query)

return None

To run this application:

bash
uvicorn main:app --reload

Using SQLAlchemy 2.0 with Async

SQLAlchemy 1.4+ and 2.0 include native async support. Here's how to use it with FastAPI:

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

# Async database URL (note the async+postgresql prefix)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/notesdb"

# Create async engine
engine = create_async_engine(DATABASE_URL, echo=True)

# Create async session factory
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)

# Base class for declarative models
Base = declarative_base()

# Database dependency
async def get_db():
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()

Then you can use it in your FastAPI endpoints:

python
from fastapi import Depends
from sqlalchemy.future import select

@app.get("/notes/{note_id}")
async def get_note(note_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Note).where(Note.id == note_id))
note = result.scalars().first()

if note is None:
raise HTTPException(status_code=404, detail="Note not found")

return note

Best Practices for Async Database Operations

  1. Always await database operations: Never leave an async database call without awaiting it.

  2. Use connection pooling: Most async database libraries support connection pooling to efficiently reuse connections.

  3. Handle errors properly: Use try/except blocks to catch and handle database errors.

python
@app.get("/notes/{note_id}")
async def get_note(note_id: int):
try:
query = notes.select().where(notes.c.id == note_id)
note = await database.fetch_one(query)
if note is None:
raise HTTPException(status_code=404, detail="Note not found")
return note
except Exception as e:
# Log the error
print(f"Database error: {str(e)}")
raise HTTPException(status_code=500, detail="Database error occurred")
  1. Consider pagination for large datasets: Avoid fetching too many rows at once.
python
@app.get("/notes/")
async def read_notes(skip: int = 0, limit: int = 100):
query = notes.select().offset(skip).limit(limit)
return await database.fetch_all(query)
  1. Use migrations for database schema changes: Tools like Alembic help manage database schema changes.

Performance Considerations

Async database operations provide several performance benefits:

  1. Concurrent execution: Multiple database operations can be performed concurrently.
python
# Execute multiple queries concurrently
results = await asyncio.gather(
database.fetch_all(notes.select().where(notes.c.id < 100)),
database.fetch_all(notes.select().where(notes.c.id >= 100))
)
  1. Non-blocking I/O: The server remains responsive while waiting for database results.

  2. Resource utilization: Better utilization of server resources across multiple requests.

Summary

Asynchronous database operations with FastAPI enable you to build high-performance web APIs that can handle multiple concurrent requests efficiently. By using the databases library or SQLAlchemy's async support, you can create non-blocking database interactions that significantly improve your application's performance.

In this tutorial, we've covered:

  1. Setting up async database connections with FastAPI
  2. Creating CRUD endpoints using async database operations
  3. Building a complete async-enabled notes API
  4. Best practices for async database programming
  5. Performance considerations and benefits

Additional Resources

Exercises

  1. Extend the notes API to include user authentication using async database operations.
  2. Implement async pagination for the notes endpoint.
  3. Create a search endpoint that queries notes asynchronously based on title or content.
  4. Implement database transactions for operations that require multiple writes.
  5. Add unit tests for your async database functions using pytest-asyncio.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)