Skip to main content

FastAPI Performance Tips

FastAPI is already one of the fastest Python frameworks available, but there are still ways to make your applications even more performant. This guide explores practical techniques to optimize your FastAPI applications and ensure they can handle high loads efficiently.

Introduction to FastAPI Performance

FastAPI is built on Starlette and Pydantic, with performance as a core design principle. While it's fast by default, understanding certain optimization strategies can help you squeeze out even more performance, especially for production applications with high traffic.

Performance optimization in FastAPI typically involves:

  1. Making proper use of async features
  2. Implementing caching strategies
  3. Optimizing database interactions
  4. Fine-tuning deployment configurations
  5. Minimizing response payload sizes

Let's examine each of these aspects in detail.

Leveraging Asynchronous Operations

FastAPI's support for asynchronous programming is a key performance feature.

When to Use Async

Async operations provide the most benefit when your application:

  • Makes network calls (API requests, database queries)
  • Performs I/O operations
  • Needs to handle many concurrent requests

Basic Async Endpoint Example

python
from fastapi import FastAPI
import asyncio

app = FastAPI()

@app.get("/sync")
def sync_route():
# This blocks the worker handling this request
import time
time.sleep(1)
return {"message": "Processed synchronously"}

@app.get("/async")
async def async_route():
# This allows the worker to handle other requests during this wait
await asyncio.sleep(1)
return {"message": "Processed asynchronously"}

When your server is under load, the async version can handle significantly more concurrent requests because it doesn't block the worker during the sleep period.

Async with Dependencies

Make your dependencies async as well for optimal performance:

python
from fastapi import FastAPI, Depends
import asyncio

app = FastAPI()

async def get_user_data():
await asyncio.sleep(0.5) # Simulate async database query
return {"user_id": 123, "name": "FastAPI User"}

@app.get("/users/me")
async def read_user_me(user_data: dict = Depends(get_user_data)):
return user_data

Implementing Caching Strategies

Caching is one of the most effective ways to improve API performance.

In-Memory Caching

For simple cases, you can use a Python dictionary:

python
from fastapi import FastAPI
import time

app = FastAPI()

# Simple cache
cache = {}
CACHE_EXPIRY = 60 # seconds

@app.get("/expensive-operation/{item_id}")
async def expensive_operation(item_id: int):
# Check if result is in cache and not expired
current_time = time.time()
if item_id in cache and (current_time - cache[item_id]["timestamp"] < CACHE_EXPIRY):
return {"result": cache[item_id]["data"], "source": "cache"}

# Simulate expensive operation
await asyncio.sleep(2) # e.g., complex calculation or database query
result = {"value": item_id * 100, "computed_at": current_time}

# Store in cache
cache[item_id] = {"data": result, "timestamp": current_time}

return {"result": result, "source": "computed"}

Redis Caching

For production applications, Redis is often a better choice:

python
from fastapi import FastAPI, Depends
import redis.asyncio as redis
import json
import asyncio

app = FastAPI()

# Setup Redis connection
async def get_redis():
redis_client = redis.Redis(host='localhost', port=6379, db=0)
try:
yield redis_client
finally:
await redis_client.close()

@app.get("/data/{item_id}")
async def get_data(item_id: str, redis_client: redis.Redis = Depends(get_redis)):
# Try to get from cache
cached_data = await redis_client.get(f"item:{item_id}")

if cached_data:
return {"data": json.loads(cached_data), "source": "cache"}

# Simulate data retrieval
await asyncio.sleep(1)
data = {"id": item_id, "name": f"Item {item_id}", "details": "Some computed details"}

# Store in cache with expiration (30 seconds)
await redis_client.setex(
f"item:{item_id}",
30, # expiry in seconds
json.dumps(data)
)

return {"data": data, "source": "database"}

Optimizing Database Interactions

Database operations are often the biggest bottleneck in API performance.

Using Async Database Drivers

If you're using SQLAlchemy with FastAPI, consider using the async version:

python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import models # Your SQLAlchemy models

# Database URL for async connection
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"

engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

app = FastAPI()

async def get_db():
db = AsyncSessionLocal()
try:
yield db
finally:
await db.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Async query
user = await db.get(models.User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user

Batch Processing

Instead of making multiple individual database queries, batch them:

python
@app.get("/users")
async def get_multiple_users(user_ids: str, db: AsyncSession = Depends(get_db)):
# Parse comma-separated ids
id_list = [int(id) for id in user_ids.split(",")]

# Single query to get multiple users
from sqlalchemy import select
query = select(models.User).where(models.User.id.in_(id_list))
result = await db.execute(query)
users = result.scalars().all()

return users

Use Database Indexes

Make sure your database tables have appropriate indexes for the queries you run most often. For example, if you frequently query users by email:

sql
CREATE INDEX idx_users_email ON users(email);

Response Optimization Techniques

Use Compression

FastAPI (via Starlette) supports response compression:

python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

# Add GZip compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)

Pagination for Large Datasets

Always paginate when returning large collections:

python
from fastapi import FastAPI, Query
from typing import List
import models

app = FastAPI()

@app.get("/items", response_model=List[models.Item])
async def read_items(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
# Return paginated results
return db.get_items(skip=skip, limit=limit)

Selective Field Returns

Use Pydantic to return only the needed fields:

python
from fastapi import FastAPI, Query
from typing import List, Optional
from pydantic import BaseModel

class ItemBase(BaseModel):
id: int
name: str
description: str
price: float
stock: int
supplier_info: str
manufacturing_details: str

class ItemBrief(BaseModel):
id: int
name: str
price: float

app = FastAPI()

@app.get("/items", response_model=List[ItemBrief])
async def list_items():
# Even if the database returns all fields,
# only id, name, and price will be in the response
return get_items_from_db()

Deployment Considerations

Use Proper Workers

When deploying with Uvicorn, set the number of workers appropriately:

bash
uvicorn app:app --workers 4

A good starting point is setting workers to (2 x number_of_cores) + 1.

Configure Connection Pooling

For database connections, use connection pooling:

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

# Create engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Maximum number of connections
max_overflow=10, # Maximum number of extra connections
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recycle connections after 30 minutes
)

AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

Real-world Example: Optimized API Endpoint

Let's put everything together in a real-world example for an e-commerce API:

python
from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
import json
import time
from typing import List, Optional
from . import models, schemas, database

app = FastAPI()

# Dependency for Redis connection
async def get_redis():
redis_client = Redis(host='localhost', port=6379, db=0)
try:
yield redis_client
finally:
await redis_client.close()

# Dependency for database connection
async def get_db():
session = database.AsyncSessionLocal()
try:
yield session
finally:
await session.close()

@app.get("/products", response_model=List[schemas.ProductBrief])
async def get_products(
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
sort_by: str = Query("popularity", enum=["price", "popularity", "newest"]),
order: str = Query("desc", enum=["asc", "desc"]),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis)
):
# Build cache key based on all parameters
cache_key = f"products:{category}:{min_price}:{max_price}:{sort_by}:{order}:{page}:{page_size}"

# Try to get from cache
cached_data = await redis.get(cache_key)
if cached_data:
return json.loads(cached_data)

# Generate filters based on query parameters
filters = []
if category:
filters.append(models.Product.category == category)
if min_price is not None:
filters.append(models.Product.price >= min_price)
if max_price is not None:
filters.append(models.Product.price <= max_price)

# Calculate skip for pagination
skip = (page - 1) * page_size

# Perform optimized database query
query = await db.execute(
models.Product.query
.filter(*filters)
.order_by(getattr(models.Product, sort_by).desc() if order == "desc" else getattr(models.Product, sort_by).asc())
.offset(skip)
.limit(page_size)
)

products = query.scalars().all()

# Convert to response model
result = [schemas.ProductBrief.from_orm(p) for p in products]

# Cache the result for 5 minutes
await redis.setex(cache_key, 300, json.dumps([p.dict() for p in result]))

return result

This example includes:

  • Async database access
  • Redis caching with a cache key based on all query parameters
  • Pagination to limit response size
  • Filtering and sorting options
  • Response model to limit returned fields

Performance Monitoring

To ensure your optimizations are effective, implement performance monitoring:

python
from fastapi import FastAPI, Request
import time
import logging

app = FastAPI()
logger = logging.getLogger("api")

@app.middleware("http")
async def log_request_time(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time

# Log requests taking longer than 500ms
if process_time > 0.5:
logger.warning(
f"Long request: {request.method} {request.url.path} took {process_time:.4f} seconds"
)

response.headers["X-Process-Time"] = str(process_time)
return response

Summary

FastAPI is designed for performance, but applying these optimization strategies can help you achieve even better results:

  1. Use async operations when making I/O or network calls
  2. Implement caching for expensive operations and frequently accessed data
  3. Optimize database interactions with async drivers, connection pooling, and batch operations
  4. Minimize response payload using pagination and selective field returns
  5. Configure deployments properly with the right number of workers
  6. Monitor performance to identify bottlenecks

Remember that premature optimization can be counterproductive. Start with a clean, maintainable FastAPI application, and apply these performance tips when you've identified specific bottlenecks through profiling and monitoring.

Additional Resources

Exercises

  1. Take an existing synchronous FastAPI endpoint and convert it to use async/await.
  2. Implement a simple in-memory caching system for a computationally expensive endpoint.
  3. Add Redis caching to a FastAPI application that queries a database.
  4. Create a paginated endpoint that returns different fields based on a query parameter.
  5. Set up performance monitoring middleware that logs slow requests and identification of potential bottlenecks.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)