Skip to main content

FastAPI Performance Tuning

Introduction

FastAPI is already built for high performance, thanks to its foundation on Starlette and Pydantic. However, as your application grows in complexity and users, you may need to implement additional optimizations to maintain or improve its performance.

In this guide, we'll explore various strategies to tune your FastAPI application for optimal performance. We'll cover techniques ranging from database optimizations to asynchronous programming patterns, caching, and deployment configurations.

Why Performance Matters

Before diving into optimization techniques, it's important to understand why performance matters:

  • User Experience: Faster response times lead to better user experiences
  • Resource Efficiency: Optimized applications require fewer server resources
  • Cost Savings: More efficient applications mean lower infrastructure costs
  • Scalability: Well-optimized applications scale better under heavy loads

Baseline Performance Measurement

Before optimizing anything, establish a performance baseline:

python
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

@app.get("/")
async def read_root():
return {"Hello": "World"}

This middleware adds a header showing processing time, giving you a simple way to measure the impact of your optimizations.

1. Asynchronous Programming

FastAPI is built for async, but you need to use it properly:

Use Async Where It Matters

python
# ❌ Bad: Using sync function for I/O bound operations
@app.get("/items/{item_id}")
def read_item(item_id: int):
# This blocks the event loop
result = make_blocking_database_call(item_id)
return result

# ✅ Good: Using async for I/O bound operations
@app.get("/items/{item_id}")
async def read_item(item_id: int):
# This frees up the event loop during I/O wait
result = await make_async_database_call(item_id)
return result

Use Proper Libraries

For true async performance gains, use async-compatible libraries:

  • Database: SQLAlchemy 2.0, asyncpg, motor
  • HTTP Clients: httpx, aiohttp
  • Redis: aioredis

Example with asyncpg:

python
import asyncpg
from fastapi import FastAPI, Depends

app = FastAPI()

async def get_connection():
conn = await asyncpg.connect("postgresql://user:password@localhost/db")
try:
yield conn
finally:
await conn.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn = Depends(get_connection)):
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return dict(row) if row else {"error": "User not found"}

2. Database Optimization

Connection Pooling

python
import asyncpg
from fastapi import FastAPI

app = FastAPI()
pool = None

@app.on_event("startup")
async def startup():
global pool
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/db",
min_size=5,
max_size=20
)

@app.on_event("shutdown")
async def shutdown():
await pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return dict(row) if row else {"error": "User not found"}

Query Optimization

  • Use indices for frequently queried columns
  • Select only the columns you need
  • Use batch operations when possible
python
# ❌ Bad: Selecting all columns
@app.get("/users")
async def get_users():
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users LIMIT 100")
return [dict(row) for row in rows]

# ✅ Better: Selecting only needed columns
@app.get("/users")
async def get_users():
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT id, name, email FROM users LIMIT 100")
return [dict(row) for row in rows]

3. Response Optimization

Use Response Models

Define response models to ensure you're only returning necessary data:

python
from typing import List
from pydantic import BaseModel
from fastapi import FastAPI

app = FastAPI()

class UserResponse(BaseModel):
id: int
name: str
# Only fields we need, not the full DB model

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
# Database query that might return more fields than needed
user_data = await get_user_from_db(user_id)
return user_data # FastAPI will filter fields automatically

Response Compression

FastAPI can automatically compress responses:

python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

4. Caching Strategies

In-Memory Caching

python
from fastapi import FastAPI
import time

app = FastAPI()
cache = {}
CACHE_EXPIRY = 60 # seconds

@app.get("/cached-data/{item_id}")
async def get_cached_data(item_id: str):
current_time = time.time()

# Check if data exists in cache and is not expired
if item_id in cache and cache[item_id]["expiry"] > current_time:
return cache[item_id]["data"]

# Expensive operation (e.g., DB query)
data = await fetch_expensive_data(item_id)

# Store in cache
cache[item_id] = {
"data": data,
"expiry": current_time + CACHE_EXPIRY
}

return data

Redis Caching

For distributed applications, use Redis:

python
import aioredis
import json
from fastapi import FastAPI

app = FastAPI()
redis = None

@app.on_event("startup")
async def startup():
global redis
redis = await aioredis.from_url("redis://localhost")

@app.on_event("shutdown")
async def shutdown():
await redis.close()

@app.get("/products/{product_id}")
async def get_product(product_id: int):
# Try to get from cache
cached = await redis.get(f"product:{product_id}")
if cached:
return json.loads(cached)

# If not in cache, get from DB
product = await get_product_from_db(product_id)

# Store in cache for 5 minutes
await redis.set(
f"product:{product_id}",
json.dumps(product),
ex=300
)

return product

5. Background Tasks

Offload time-consuming processes with background tasks:

python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

async def process_notification(email: str, message: str):
# Simulate an expensive operation
import asyncio
await asyncio.sleep(5) # E.g., sending an email
print(f"Notification sent to {email}: {message}")

@app.post("/orders/")
async def create_order(order: dict, background_tasks: BackgroundTasks):
# Save order to database
order_id = await save_order(order)

# This happens after response is sent to client
background_tasks.add_task(
process_notification,
order["email"],
f"Your order #{order_id} has been received"
)

return {"order_id": order_id, "status": "processing"}

6. Rate Limiting

Protect your API from abuse and prevent overloading:

python
from fastapi import FastAPI, Request, HTTPException
import time

app = FastAPI()

# Simple in-memory rate limiter
class RateLimiter:
def __init__(self, requests_per_minute=60):
self.requests_per_minute = requests_per_minute
self.window_size = 60 # 1 minute
self.client_requests = {}

async def is_rate_limited(self, client_id: str) -> bool:
current_time = time.time()

if client_id not in self.client_requests:
self.client_requests[client_id] = []

# Remove requests older than window size
self.client_requests[client_id] = [
timestamp for timestamp in self.client_requests[client_id]
if current_time - timestamp < self.window_size
]

# Check if client exceeds rate limit
if len(self.client_requests[client_id]) >= self.requests_per_minute:
return True

# Add current request timestamp
self.client_requests[client_id].append(current_time)
return False

limiter = RateLimiter(requests_per_minute=60)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.client.host
if await limiter.is_rate_limited(client_id):
raise HTTPException(status_code=429, detail="Rate limit exceeded")

response = await call_next(request)
return response

7. Deployment Optimization

Worker Configuration

When deploying with Uvicorn or Gunicorn, optimize worker count:

bash
# For CPU-bound applications
gunicorn -w $(nproc) -k uvicorn.workers.UvicornWorker main:app

# For I/O-bound applications
gunicorn -w $(( 2 * $(nproc) + 1 )) -k uvicorn.workers.UvicornWorker main:app

Using ASGI Servers

FastAPI works with various ASGI servers. Uvicorn with Gunicorn is common, but you can also try Hypercorn:

bash
# Install Hypercorn
pip install hypercorn

# Run with Hypercorn
hypercorn main:app --workers 4

Real-World Example: Optimized Blog API

Let's tie everything together with a comprehensive example of an optimized blog API:

python
import asyncpg
import aioredis
import json
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
import time

# Models
class PostBase(BaseModel):
title: str
content: str

class PostCreate(PostBase):
author_id: int

class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime

class Config:
orm_mode = True

# Application
app = FastAPI(title="Optimized Blog API")

# Database and Redis connections
db_pool = None
redis = None

@app.on_event("startup")
async def startup():
global db_pool, redis
db_pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/blogdb",
min_size=5,
max_size=20
)
redis = await aioredis.from_url("redis://localhost")

# Create tables if they don't exist
async with db_pool.acquire() as conn:
await conn.execute('''
CREATE TABLE IF NOT EXISTS posts (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
''')

@app.on_event("shutdown")
async def shutdown():
await db_pool.close()
await redis.close()

# Helper functions
async def log_activity(action: str, details: dict):
"""Simulates logging to an external system"""
import asyncio
await asyncio.sleep(0.5) # Simulating network call
print(f"LOGGED: {action} - {json.dumps(details)}")

# Routes
@app.get("/posts/", response_model=List[PostResponse])
async def get_posts(skip: int = 0, limit: int = 10):
# Try to get from cache
cache_key = f"posts:list:{skip}:{limit}"
cached = await redis.get(cache_key)

if cached:
return json.loads(cached)

# If not in cache, query database
async with db_pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, title, content, author_id, created_at
FROM posts
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
""",
limit, skip
)

# Convert to list of dictionaries
posts = [dict(row) for row in rows]

# Cache results for 1 minute
await redis.set(cache_key, json.dumps(posts, default=str), ex=60)

return posts

@app.get("/posts/{post_id}", response_model=PostResponse)
async def get_post(post_id: int):
# Try to get from cache
cache_key = f"posts:{post_id}"
cached = await redis.get(cache_key)

if cached:
return json.loads(cached)

# If not in cache, query database
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, title, content, author_id, created_at FROM posts WHERE id = $1",
post_id
)

if not row:
raise HTTPException(status_code=404, detail="Post not found")

post = dict(row)

# Cache results for 5 minutes
await redis.set(cache_key, json.dumps(post, default=str), ex=300)

return post

@app.post("/posts/", response_model=PostResponse, status_code=201)
async def create_post(post: PostCreate, background_tasks: BackgroundTasks):
async with db_pool.acquire() as conn:
# Insert post
row = await conn.fetchrow(
"""
INSERT INTO posts (title, content, author_id)
VALUES ($1, $2, $3)
RETURNING id, title, content, author_id, created_at
""",
post.title, post.content, post.author_id
)

new_post = dict(row)

# Log activity in background
background_tasks.add_task(
log_activity,
"post_created",
{"post_id": new_post["id"], "author_id": post.author_id}
)

# Invalidate cache for post list
await redis.delete("posts:list:0:10") # Most common query

return new_post

# Add middleware for timing
@app.middleware("http")
async def add_process_time_header(request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response

Performance Tips Summary

  1. Use async properly: Ensure I/O operations don't block the event loop
  2. Optimize database access: Use connection pools, indices, and query only what you need
  3. Implement caching: Use Redis or in-memory caching for frequent data
  4. Offload heavy processing: Use background tasks for operations that don't need immediate results
  5. Response optimization: Use response models and compression
  6. Connection pooling: Reuse connections to databases and external services
  7. Properly configure workers: Match your deployment to your application needs
  8. Rate limiting: Protect your API from abuse
  9. Monitor and measure: You can't optimize what you don't measure

Common Pitfalls to Avoid

  • Blocking the event loop: Using synchronous libraries in async functions
  • N+1 query problem: Making separate database queries in loops
  • Over-optimization: Optimizing before identifying actual bottlenecks
  • Memory leaks: Not properly managing resources (connection pools, cached data)
  • Too many workers: Setting too many workers can actually degrade performance

Summary

Performance tuning FastAPI applications involves a holistic approach addressing databases, caching, asynchronous patterns, and deployment configurations. By establishing baselines, measuring impact, and applying targeted optimizations, you can ensure your FastAPI application remains fast and scalable even as it grows.

Remember that optimization should be driven by data and measurements. Before implementing any performance tuning strategies, identify bottlenecks through profiling and ensure you're optimizing for the right metrics.

Additional Resources

Exercises

  1. Measure baseline performance of a simple FastAPI application with and without async database calls.
  2. Implement a caching layer using Redis for a read-heavy endpoint.
  3. Create a load test using a tool like Locust to identify bottlenecks in your application.
  4. Compare performance between different database client libraries (e.g., asyncpg vs psycopg2).
  5. Optimize a slow endpoint by applying the techniques learned in this guide.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)