Skip to main content

FastAPI Performance Testing

Introduction

Performance testing is a critical aspect of API development, ensuring your FastAPI application can handle expected loads while maintaining acceptable response times. Unlike functional testing, which verifies that your API works correctly, performance testing evaluates how well it performs under various conditions.

In this guide, we'll explore different approaches to performance testing your FastAPI applications, from basic benchmarking to comprehensive load testing. You'll learn how to identify bottlenecks, measure response times, and ensure your API can handle real-world traffic scenarios.

Why Performance Test FastAPI Applications?

Even though FastAPI is built for high performance (based on Starlette and Uvicorn with impressive speed benchmarks), your specific implementation might introduce bottlenecks. Performance testing helps you:

  1. Identify slow endpoints or database queries
  2. Determine maximum throughput capacity
  3. Discover memory leaks under sustained load
  4. Establish baseline metrics for future comparisons
  5. Validate that your API meets performance requirements

Basic Response Time Measurement

Let's start with a simple approach to measure endpoint response times using Python's requests library and the built-in time module.

python
import requests
import time
import statistics

def measure_endpoint_performance(url, method="GET", data=None, headers=None, num_requests=100):
"""Measure the response time of an endpoint over multiple requests."""
response_times = []

for _ in range(num_requests):
start_time = time.time()

if method.upper() == "GET":
response = requests.get(url, headers=headers)
elif method.upper() == "POST":
response = requests.post(url, json=data, headers=headers)
# Add other methods as needed

end_time = time.time()
response_time = (end_time - start_time) * 1000 # Convert to milliseconds
response_times.append(response_time)

# Ensure the request was successful
response.raise_for_status()

# Add a small delay to avoid overwhelming the server
time.sleep(0.01)

return {
"min": min(response_times),
"max": max(response_times),
"avg": statistics.mean(response_times),
"median": statistics.median(response_times),
"p95": statistics.quantiles(response_times, n=20)[18], # 95th percentile
"num_requests": num_requests
}

# Example usage
if __name__ == "__main__":
results = measure_endpoint_performance(
url="http://localhost:8000/items/",
num_requests=50
)

print(f"Performance Results:")
print(f"Min Response Time: {results['min']:.2f}ms")
print(f"Max Response Time: {results['max']:.2f}ms")
print(f"Avg Response Time: {results['avg']:.2f}ms")
print(f"Median Response Time: {results['median']:.2f}ms")
print(f"95th Percentile: {results['p95']:.2f}ms")

Sample output:

Performance Results:
Min Response Time: 4.23ms
Max Response Time: 15.78ms
Avg Response Time: 6.45ms
Median Response Time: 5.89ms
95th Percentile: 10.33ms

This simple script provides a starting point for measuring endpoint performance. However, for more comprehensive testing, dedicated tools offer better features.

Using locust for Load Testing

Locust is a powerful, easy-to-use load testing tool that allows you to write test scenarios in Python and simulate thousands of users.

Setting Up Locust

First, install Locust:

bash
pip install locust

Now, create a file named locustfile.py with your test scenarios:

python
from locust import HttpUser, task, between

class APIUser(HttpUser):
wait_time = between(1, 3) # Wait 1-3 seconds between tasks

@task(3) # Weight: this task will be called 3x more often
def get_items(self):
with self.client.get("/items/", catch_response=True) as response:
if response.status_code == 200:
response.success()
else:
response.failure(f"Got status code {response.status_code}")

@task(1)
def create_item(self):
payload = {
"name": "Test Item",
"description": "A test item created during load testing",
"price": 29.99,
"is_offer": False
}

with self.client.post("/items/", json=payload, catch_response=True) as response:
if response.status_code == 200 or response.status_code == 201:
response.success()
else:
response.failure(f"Failed to create item: {response.text}")

@task(2)
def get_specific_item(self):
item_id = 1 # Assuming this item exists
with self.client.get(f"/items/{item_id}", catch_response=True) as response:
if response.status_code == 200:
response.success()
elif response.status_code == 404:
response.failure(f"Item {item_id} not found")
else:
response.failure(f"Unexpected status: {response.status_code}")

Run Locust with:

bash
locust -H http://localhost:8000

Then open your browser to http://localhost:8089 to access the Locust web interface, where you can:

  1. Set the number of users to simulate
  2. Set the spawn rate (users per second)
  3. Start the test and monitor results in real-time

Locust Web UI

Server-Side Profiling with py-spy

Sometimes the bottleneck is in your code rather than external factors. For these cases, profiling helps identify slow functions. py-spy is a sampling profiler that can monitor a running Python process without restarting it.

Install py-spy:

bash
pip install py-spy

Run your FastAPI application, note the process ID, and then run:

bash
py-spy record -o profile.svg --pid YOUR_PROCESS_ID

This generates an interactive SVG flame graph showing where time is spent in your application:

python
# Example FastAPI application with potential performance issues
from fastapi import FastAPI, Depends
import time

app = FastAPI()

def slow_dependency():
"""A deliberately slow dependency"""
time.sleep(0.2) # Simulate a slow database query
return {"data": "processed"}

@app.get("/fast")
async def fast_endpoint():
return {"message": "This endpoint is fast"}

@app.get("/slow")
async def slow_endpoint(data=Depends(slow_dependency)):
result = {}
# Inefficient processing
for i in range(10000):
result[i] = i * i
return {"message": "This endpoint is slow", "data": data}

The flame graph would show that slow_dependency() and the loop in slow_endpoint() consume significant time, making these functions targets for optimization.

Database Performance Testing

Often, database operations are the main bottleneck in API performance. Here's how to test database performance:

python
import time
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

# Setup database connection
engine = create_engine("postgresql://user:password@localhost/dbname")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@contextmanager
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

def test_query_performance(query, params=None, iterations=100):
"""Test the performance of a specific database query."""
execution_times = []

with get_db() as db:
for _ in range(iterations):
start_time = time.time()
result = db.execute(text(query), params or {})
# Force evaluation of the result
rows = result.fetchall()
end_time = time.time()

execution_times.append((end_time - start_time) * 1000) # ms

return {
"min": min(execution_times),
"max": max(execution_times),
"avg": sum(execution_times) / len(execution_times),
"total_rows": len(rows),
"iterations": iterations
}

# Example usage
query_results = test_query_performance(
"SELECT * FROM items WHERE price > :min_price ORDER BY price DESC",
{"min_price": 50.0},
iterations=20
)

print(f"Query performance:")
print(f"Min execution time: {query_results['min']:.2f}ms")
print(f"Max execution time: {query_results['max']:.2f}ms")
print(f"Avg execution time: {query_results['avg']:.2f}ms")
print(f"Retrieved {query_results['total_rows']} rows")

Best Practices for FastAPI Performance

Based on the testing approaches above, here are some best practices for optimizing FastAPI performance:

  1. Use async functions appropriately: For I/O bound operations (like database queries or HTTP requests), use async/await to improve concurrency.
python
@app.get("/items/")
async def read_items():
# This is an I/O-bound operation, good for async
items = await database.fetch_all("SELECT * FROM items")
return items
  1. Minimize database queries: Use efficient joins and select only needed columns.
python
# Inefficient: Makes N+1 queries
@app.get("/users-with-items/")
async def get_users_with_items():
users = await database.fetch_all("SELECT * FROM users")
for user in users:
# Extra query for each user!
user.items = await database.fetch_all(
"SELECT * FROM items WHERE user_id = :user_id",
{"user_id": user.id}
)
return users

# Better: Single join query
@app.get("/users-with-items-optimized/")
async def get_users_with_items_optimized():
query = """
SELECT u.id, u.name, u.email, i.id as item_id, i.name as item_name
FROM users u
LEFT JOIN items i ON u.id = i.user_id
"""
rows = await database.fetch_all(query)
# Process the results to group items by user
# ...
return users
  1. Use connection pooling for databases to reuse connections rather than creating new ones for each request.

  2. Add appropriate indexes to your database tables based on common query patterns.

  3. Implement caching for frequently accessed, rarely changed data:

python
from fastapi import FastAPI, Depends
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis

app = FastAPI()

@app.on_event("startup")
async def startup():
redis_client = redis.from_url("redis://localhost", encoding="utf8")
FastAPICache.init(RedisBackend(redis_client), prefix="fastapi-cache:")

@app.get("/expensive-calculation/{number}")
@cache(expire=60) # Cache for 60 seconds
async def get_expensive_calculation(number: int):
# Simulate expensive calculation
result = sum(i * i for i in range(number))
return {"number": number, "result": result}
  1. Enable GZIP compression in your ASGI server (like Uvicorn) to reduce payload size.

  2. Use pagination for endpoints returning large collections:

python
@app.get("/items/")
async def read_items(skip: int = 0, limit: int = 100):
return await database.fetch_all(
"SELECT * FROM items LIMIT :limit OFFSET :skip",
{"skip": skip, "limit": limit}
)

Real-World Case Study: Optimizing an API Endpoint

Let's walk through a case study of optimizing a slow endpoint:

Original Endpoint:

python
@app.get("/dashboard-stats/")
async def get_dashboard_stats():
# Multiple separate database queries
active_users = await db.fetch_one("SELECT COUNT(*) FROM users WHERE is_active = true")
total_orders = await db.fetch_one("SELECT COUNT(*) FROM orders")
revenue = await db.fetch_one("SELECT SUM(amount) FROM orders WHERE status = 'completed'")
popular_products = await db.fetch_all(
"SELECT product_id, COUNT(*) as order_count FROM order_items GROUP BY product_id ORDER BY order_count DESC LIMIT 5"
)

# Fetch product details for each popular product
for product in popular_products:
product_details = await db.fetch_one(
"SELECT name, price FROM products WHERE id = :id",
{"id": product["product_id"]}
)
product["name"] = product_details["name"]
product["price"] = product_details["price"]

return {
"active_users": active_users["count"],
"total_orders": total_orders["count"],
"revenue": revenue["sum"] or 0.0,
"popular_products": popular_products
}

Initial performance test result:

  • Average response time: 850ms
  • Under load (50 concurrent users): 2300ms

Optimized Endpoint:

python
from fastapi_cache.decorator import cache

@app.get("/dashboard-stats/")
@cache(expire=300) # Cache for 5 minutes
async def get_dashboard_stats():
# Combine queries into a single transaction
query = """
SELECT
(SELECT COUNT(*) FROM users WHERE is_active = true) AS active_users,
(SELECT COUNT(*) FROM orders) AS total_orders,
(SELECT COALESCE(SUM(amount), 0) FROM orders WHERE status = 'completed') AS revenue,
(
SELECT json_agg(p)
FROM (
SELECT
oi.product_id,
COUNT(*) as order_count,
pr.name,
pr.price
FROM order_items oi
JOIN products pr ON oi.product_id = pr.id
GROUP BY oi.product_id, pr.name, pr.price
ORDER BY order_count DESC
LIMIT 5
) p
) AS popular_products
"""

result = await db.fetch_one(query)

return {
"active_users": result["active_users"],
"total_orders": result["total_orders"],
"revenue": result["revenue"],
"popular_products": result["popular_products"] or []
}

Optimized performance test result:

  • First request average response time: 220ms (74% improvement)
  • Cached response time: 12ms
  • Under load (50 concurrent users): 290ms first request, 15ms cached

This optimization:

  1. Combined multiple queries into a single database transaction
  2. Eliminated the N+1 query problem with product details
  3. Added caching for frequently accessed dashboard data
  4. Used proper null handling with COALESCE

Summary

Performance testing is essential for developing high-quality FastAPI applications. We've explored various techniques from basic response time measurement to comprehensive load testing with Locust and server-side profiling.

Remember these key points:

  1. Start with simple baseline measurements to understand current performance
  2. Use specialized tools like Locust for realistic load testing
  3. Profile your code to find CPU-intensive hotspots
  4. Monitor database performance, often the primary bottleneck
  5. Implement optimizations: caching, query optimization, connection pooling
  6. Test again to verify improvements

By systematically identifying and addressing performance issues, you can ensure your FastAPI application meets the demands of production environments.

Additional Resources

Exercises

  1. Create a simple FastAPI application and benchmark two versions of the same endpoint - one using synchronous code and one using asynchronous code.

  2. Write a Locust test for a FastAPI application that simulates users browsing products, adding items to a cart, and completing purchases.

  3. Identify and fix a performance bottleneck in a FastAPI endpoint that fetches related data from multiple database tables.

  4. Implement Redis-based caching for a frequently accessed endpoint and measure the performance improvement.

  5. Create a dashboard that monitors your FastAPI application's performance metrics in real-time.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)