FastAPI Performance Tips
FastAPI is already one of the fastest Python frameworks available, but there are still ways to make your applications even more performant. This guide explores practical techniques to optimize your FastAPI applications and ensure they can handle high loads efficiently.
Introduction to FastAPI Performance
FastAPI is built on Starlette and Pydantic, with performance as a core design principle. While it's fast by default, understanding certain optimization strategies can help you squeeze out even more performance, especially for production applications with high traffic.
Performance optimization in FastAPI typically involves:
- Making proper use of async features
- Implementing caching strategies
- Optimizing database interactions
- Fine-tuning deployment configurations
- Minimizing response payload sizes
Let's examine each of these aspects in detail.
Leveraging Asynchronous Operations
FastAPI's support for asynchronous programming is a key performance feature.
When to Use Async
Async operations provide the most benefit when your application:
- Makes network calls (API requests, database queries)
- Performs I/O operations
- Needs to handle many concurrent requests
Basic Async Endpoint Example
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/sync")
def sync_route():
# This blocks the worker handling this request
import time
time.sleep(1)
return {"message": "Processed synchronously"}
@app.get("/async")
async def async_route():
# This allows the worker to handle other requests during this wait
await asyncio.sleep(1)
return {"message": "Processed asynchronously"}
When your server is under load, the async version can handle significantly more concurrent requests because it doesn't block the worker during the sleep period.
Async with Dependencies
Make your dependencies async as well for optimal performance:
from fastapi import FastAPI, Depends
import asyncio
app = FastAPI()
async def get_user_data():
await asyncio.sleep(0.5) # Simulate async database query
return {"user_id": 123, "name": "FastAPI User"}
@app.get("/users/me")
async def read_user_me(user_data: dict = Depends(get_user_data)):
return user_data
Implementing Caching Strategies
Caching is one of the most effective ways to improve API performance.
In-Memory Caching
For simple cases, you can use a Python dictionary:
from fastapi import FastAPI
import time
app = FastAPI()
# Simple cache
cache = {}
CACHE_EXPIRY = 60 # seconds
@app.get("/expensive-operation/{item_id}")
async def expensive_operation(item_id: int):
# Check if result is in cache and not expired
current_time = time.time()
if item_id in cache and (current_time - cache[item_id]["timestamp"] < CACHE_EXPIRY):
return {"result": cache[item_id]["data"], "source": "cache"}
# Simulate expensive operation
await asyncio.sleep(2) # e.g., complex calculation or database query
result = {"value": item_id * 100, "computed_at": current_time}
# Store in cache
cache[item_id] = {"data": result, "timestamp": current_time}
return {"result": result, "source": "computed"}
Redis Caching
For production applications, Redis is often a better choice:
from fastapi import FastAPI, Depends
import redis.asyncio as redis
import json
import asyncio
app = FastAPI()
# Setup Redis connection
async def get_redis():
redis_client = redis.Redis(host='localhost', port=6379, db=0)
try:
yield redis_client
finally:
await redis_client.close()
@app.get("/data/{item_id}")
async def get_data(item_id: str, redis_client: redis.Redis = Depends(get_redis)):
# Try to get from cache
cached_data = await redis_client.get(f"item:{item_id}")
if cached_data:
return {"data": json.loads(cached_data), "source": "cache"}
# Simulate data retrieval
await asyncio.sleep(1)
data = {"id": item_id, "name": f"Item {item_id}", "details": "Some computed details"}
# Store in cache with expiration (30 seconds)
await redis_client.setex(
f"item:{item_id}",
30, # expiry in seconds
json.dumps(data)
)
return {"data": data, "source": "database"}
Optimizing Database Interactions
Database operations are often the biggest bottleneck in API performance.
Using Async Database Drivers
If you're using SQLAlchemy with FastAPI, consider using the async version:
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import models # Your SQLAlchemy models
# Database URL for async connection
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
app = FastAPI()
async def get_db():
db = AsyncSessionLocal()
try:
yield db
finally:
await db.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Async query
user = await db.get(models.User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
Batch Processing
Instead of making multiple individual database queries, batch them:
@app.get("/users")
async def get_multiple_users(user_ids: str, db: AsyncSession = Depends(get_db)):
# Parse comma-separated ids
id_list = [int(id) for id in user_ids.split(",")]
# Single query to get multiple users
from sqlalchemy import select
query = select(models.User).where(models.User.id.in_(id_list))
result = await db.execute(query)
users = result.scalars().all()
return users
Use Database Indexes
Make sure your database tables have appropriate indexes for the queries you run most often. For example, if you frequently query users by email:
CREATE INDEX idx_users_email ON users(email);
Response Optimization Techniques
Use Compression
FastAPI (via Starlette) supports response compression:
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
# Add GZip compression middleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
Pagination for Large Datasets
Always paginate when returning large collections:
from fastapi import FastAPI, Query
from typing import List
import models
app = FastAPI()
@app.get("/items", response_model=List[models.Item])
async def read_items(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100)
):
# Return paginated results
return db.get_items(skip=skip, limit=limit)
Selective Field Returns
Use Pydantic to return only the needed fields:
from fastapi import FastAPI, Query
from typing import List, Optional
from pydantic import BaseModel
class ItemBase(BaseModel):
id: int
name: str
description: str
price: float
stock: int
supplier_info: str
manufacturing_details: str
class ItemBrief(BaseModel):
id: int
name: str
price: float
app = FastAPI()
@app.get("/items", response_model=List[ItemBrief])
async def list_items():
# Even if the database returns all fields,
# only id, name, and price will be in the response
return get_items_from_db()
Deployment Considerations
Use Proper Workers
When deploying with Uvicorn, set the number of workers appropriately:
uvicorn app:app --workers 4
A good starting point is setting workers to (2 x number_of_cores) + 1
.
Configure Connection Pooling
For database connections, use connection pooling:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# Create engine with connection pooling
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Maximum number of connections
max_overflow=10, # Maximum number of extra connections
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recycle connections after 30 minutes
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Real-world Example: Optimized API Endpoint
Let's put everything together in a real-world example for an e-commerce API:
from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from redis.asyncio import Redis
import json
import time
from typing import List, Optional
from . import models, schemas, database
app = FastAPI()
# Dependency for Redis connection
async def get_redis():
redis_client = Redis(host='localhost', port=6379, db=0)
try:
yield redis_client
finally:
await redis_client.close()
# Dependency for database connection
async def get_db():
session = database.AsyncSessionLocal()
try:
yield session
finally:
await session.close()
@app.get("/products", response_model=List[schemas.ProductBrief])
async def get_products(
category: Optional[str] = None,
min_price: Optional[float] = None,
max_price: Optional[float] = None,
sort_by: str = Query("popularity", enum=["price", "popularity", "newest"]),
order: str = Query("desc", enum=["asc", "desc"]),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis)
):
# Build cache key based on all parameters
cache_key = f"products:{category}:{min_price}:{max_price}:{sort_by}:{order}:{page}:{page_size}"
# Try to get from cache
cached_data = await redis.get(cache_key)
if cached_data:
return json.loads(cached_data)
# Generate filters based on query parameters
filters = []
if category:
filters.append(models.Product.category == category)
if min_price is not None:
filters.append(models.Product.price >= min_price)
if max_price is not None:
filters.append(models.Product.price <= max_price)
# Calculate skip for pagination
skip = (page - 1) * page_size
# Perform optimized database query
query = await db.execute(
models.Product.query
.filter(*filters)
.order_by(getattr(models.Product, sort_by).desc() if order == "desc" else getattr(models.Product, sort_by).asc())
.offset(skip)
.limit(page_size)
)
products = query.scalars().all()
# Convert to response model
result = [schemas.ProductBrief.from_orm(p) for p in products]
# Cache the result for 5 minutes
await redis.setex(cache_key, 300, json.dumps([p.dict() for p in result]))
return result
This example includes:
- Async database access
- Redis caching with a cache key based on all query parameters
- Pagination to limit response size
- Filtering and sorting options
- Response model to limit returned fields
Performance Monitoring
To ensure your optimizations are effective, implement performance monitoring:
from fastapi import FastAPI, Request
import time
import logging
app = FastAPI()
logger = logging.getLogger("api")
@app.middleware("http")
async def log_request_time(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# Log requests taking longer than 500ms
if process_time > 0.5:
logger.warning(
f"Long request: {request.method} {request.url.path} took {process_time:.4f} seconds"
)
response.headers["X-Process-Time"] = str(process_time)
return response
Summary
FastAPI is designed for performance, but applying these optimization strategies can help you achieve even better results:
- Use async operations when making I/O or network calls
- Implement caching for expensive operations and frequently accessed data
- Optimize database interactions with async drivers, connection pooling, and batch operations
- Minimize response payload using pagination and selective field returns
- Configure deployments properly with the right number of workers
- Monitor performance to identify bottlenecks
Remember that premature optimization can be counterproductive. Start with a clean, maintainable FastAPI application, and apply these performance tips when you've identified specific bottlenecks through profiling and monitoring.
Additional Resources
- FastAPI Documentation on Performance
- Starlette Middleware Documentation
- SQLAlchemy Async Documentation
- Redis Python Client Documentation
- Async Database Drivers
Exercises
- Take an existing synchronous FastAPI endpoint and convert it to use async/await.
- Implement a simple in-memory caching system for a computationally expensive endpoint.
- Add Redis caching to a FastAPI application that queries a database.
- Create a paginated endpoint that returns different fields based on a query parameter.
- Set up performance monitoring middleware that logs slow requests and identification of potential bottlenecks.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)