Skip to main content

FastAPI Task Performance

Introduction

Background tasks in FastAPI provide a convenient way to perform operations after returning a response to the client. However, as your application scales, understanding how these tasks perform becomes critical to maintaining a responsive and efficient system. This guide explores techniques for optimizing and monitoring the performance of background tasks in FastAPI applications.

Understanding Task Performance Basics

Before diving into optimization, it's important to understand how FastAPI handles background tasks and what factors affect their performance.

How FastAPI Executes Background Tasks

FastAPI background tasks are executed in the same process pool as your main application. When you add a task using the BackgroundTasks object, it gets queued for execution after the response is sent to the client.

python
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
with open("log.txt", "a") as log_file:
log_file.write(message + "\n")

@app.post("/items/")
async def create_item(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "Item created")
return {"message": "Item created"}

In this example, the write_log function runs after the response {"message": "Item created"} is returned to the client.

Measuring Task Performance

Basic Timing Measurement

A simple way to measure task performance is by timing execution:

python
import time
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def process_data(data: dict):
start_time = time.time()

# Simulate processing
time.sleep(2)

execution_time = time.time() - start_time
with open("task_log.txt", "a") as log_file:
log_file.write(f"Task executed in {execution_time:.2f} seconds\n")

@app.post("/process/")
async def process_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(process_data, data)
return {"message": "Processing started"}

Advanced Performance Monitoring

For more detailed monitoring, you can use Python's cProfile or external libraries:

python
import cProfile
import io
import pstats
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def profile_task(func, *args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()

result = func(*args, **kwargs)

profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Print top 10 functions

with open("profile_log.txt", "a") as log_file:
log_file.write(s.getvalue() + "\n")

return result

def heavy_computation(data: dict):
# Your computationally intensive code here
result = 0
for i in range(1000000):
result += i
return result

@app.post("/analyze/")
async def analyze_data(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(profile_task, heavy_computation, data)
return {"message": "Analysis started"}

Optimizing Task Performance

1. Use Asynchronous Functions

Converting synchronous tasks to asynchronous can significantly improve performance:

python
import asyncio
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

async def async_process_data(data: dict):
# Simulate async I/O operations
await asyncio.sleep(1) # Non-blocking sleep

# Process data
result = data.get("value", 0) * 2

# Save results
with open("results.txt", "a") as f:
f.write(f"Processed result: {result}\n")

@app.post("/process-async/")
async def process_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(async_process_data, data)
return {"message": "Async processing started"}

2. Batch Processing

Process multiple items in a single task rather than creating many small tasks:

python
from fastapi import FastAPI, BackgroundTasks
from typing import List

app = FastAPI()

async def process_batch(items: List[dict]):
results = []
for item in items:
# Process each item
result = item.get("value", 0) * 2
results.append(result)

# Save all results at once
with open("batch_results.txt", "a") as f:
f.write(f"Batch results: {results}\n")

@app.post("/process-batch/")
async def batch_process(background_tasks: BackgroundTasks, items: List[dict]):
background_tasks.add_task(process_batch, items)
return {"message": f"Processing batch of {len(items)} items"}

3. Use External Task Queues for Heavy Workloads

For resource-intensive tasks, consider using dedicated task queues like Celery:

python
from fastapi import FastAPI
from celery_config import process_data_task # Import your Celery task

app = FastAPI()

@app.post("/heavy-process/")
async def heavy_process_endpoint(data: dict):
# Send task to Celery
task = process_data_task.delay(data)
return {"message": "Processing started", "task_id": task.id}

Monitoring Task Health and Performance

Creating a Health Endpoint

Monitor the health and performance metrics of your background tasks:

python
import time
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

# Global variables for monitoring
task_metrics = {
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"avg_execution_time": 0,
"total_execution_time": 0
}

def update_metrics(execution_time, success=True):
task_metrics["total_tasks"] += 1
if success:
task_metrics["completed_tasks"] += 1
else:
task_metrics["failed_tasks"] += 1

task_metrics["total_execution_time"] += execution_time
task_metrics["avg_execution_time"] = (
task_metrics["total_execution_time"] / task_metrics["completed_tasks"]
if task_metrics["completed_tasks"] > 0 else 0
)

def tracked_background_task(data):
start_time = time.time()
success = True

try:
# Actual task work
time.sleep(1) # Simulate work
except Exception:
success = False

execution_time = time.time() - start_time
update_metrics(execution_time, success)

@app.post("/tracked-task/")
async def tracked_task_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(tracked_background_task, data)
return {"message": "Task started with tracking"}

@app.get("/task-metrics/")
async def get_metrics():
return task_metrics

Real-World Example: Image Processing Service

Let's build a more practical example - an image processing service that uses background tasks:

python
import time
import asyncio
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
from fastapi.responses import JSONResponse
import aiofiles
import uuid
from PIL import Image
import io

app = FastAPI()

# Simulated image processing
async def process_image(image_path: str, processing_type: str):
start_time = time.time()

# Simulate different processing times based on complexity
if processing_type == "thumbnail":
await asyncio.sleep(1) # Simpler task
elif processing_type == "filter":
await asyncio.sleep(2) # Medium complexity
else:
await asyncio.sleep(3) # Complex task

# In a real application, you would use PIL or another library
# to actually process the image

# Log performance
execution_time = time.time() - start_time
async with aiofiles.open("image_processing_log.txt", "a") as log:
await log.write(f"Processed {image_path} with {processing_type} in {execution_time:.2f}s\n")

return f"processed_{image_path}"

@app.post("/upload-image/")
async def upload_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
process_type: str = "thumbnail"
):
# Generate a unique filename
file_extension = file.filename.split(".")[-1]
file_name = f"{uuid.uuid4()}.{file_extension}"
file_path = f"uploads/{file_name}"

# Save the uploaded file
async with aiofiles.open(file_path, "wb") as out_file:
content = await file.read()
await out_file.write(content)

# Add the processing task to background tasks
background_tasks.add_task(process_image, file_path, process_type)

return JSONResponse(
status_code=202,
content={
"message": "Image uploaded and processing started",
"file_name": file_name,
"process_type": process_type
}
)

Performance Tuning Tips for FastAPI Background Tasks

  1. Limit Concurrent Tasks: Control the number of concurrent tasks to prevent overwhelming your server.

  2. Monitor Memory Usage: Background tasks consume memory, so keep track of memory usage.

  3. Separate CPU-bound and IO-bound Tasks: Handle them differently for optimal performance.

python
# For I/O bound tasks, using async is beneficial
async def io_bound_task():
await asyncio.sleep(1) # Simulating I/O operation

# For CPU bound tasks, consider running in a separate process
def cpu_bound_task():
# Heavy computation
result = sum(i * i for i in range(10**7))
return result

@app.post("/optimized-task/")
async def optimized_task(background_tasks: BackgroundTasks):
# I/O bound task can be run directly as background task
background_tasks.add_task(io_bound_task)

# For CPU bound task, use ProcessPoolExecutor
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(cpu_bound_task)
# You can store the future somewhere to check results later

return {"message": "Tasks scheduled optimally"}
  1. Use Task Prioritization: Implement a simple priority system for your tasks:
python
from fastapi import FastAPI, BackgroundTasks
from enum import Enum
import asyncio
import heapq

class Priority(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3

app = FastAPI()
task_queue = [] # Priority queue for tasks

async def task_executor():
while True:
if task_queue:
priority, task_func, args = heapq.heappop(task_queue)
await task_func(*args)
else:
await asyncio.sleep(0.1)

@app.on_event("startup")
async def startup_event():
asyncio.create_task(task_executor())

@app.post("/prioritized-task/")
async def add_prioritized_task(priority: Priority, task_data: dict):
async def sample_task(data):
await asyncio.sleep(1)
print(f"Executed task with data: {data}")

# Add task to priority queue
heapq.heappush(task_queue, (priority, sample_task, (task_data,)))
return {"message": f"Task added with priority {priority.name}"}

Summary

Optimizing background task performance in FastAPI applications involves several key strategies:

  1. Measure performance to identify bottlenecks
  2. Use asynchronous functions when appropriate
  3. Implement batch processing for multiple similar tasks
  4. Consider external task queues for heavy workloads
  5. Monitor task health through metrics
  6. Prioritize tasks based on importance
  7. Separate CPU-bound and IO-bound tasks for optimal resource utilization

By applying these techniques, you can ensure your FastAPI background tasks run efficiently even as your application scales.

Additional Resources

Exercises

  1. Implement a background task system that logs its execution time and memory usage.
  2. Create a priority-based task queue for handling background tasks with different urgency levels.
  3. Build a simple image processing API that uses background tasks and reports task performance metrics.
  4. Compare the performance of synchronous vs. asynchronous background tasks for both I/O bound and CPU bound operations.
  5. Implement a retry mechanism for failed background tasks with exponential backoff.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)