FastAPI Task Performance
Introduction
Background tasks in FastAPI provide a convenient way to perform operations after returning a response to the client. However, as your application scales, understanding how these tasks perform becomes critical to maintaining a responsive and efficient system. This guide explores techniques for optimizing and monitoring the performance of background tasks in FastAPI applications.
Understanding Task Performance Basics
Before diving into optimization, it's important to understand how FastAPI handles background tasks and what factors affect their performance.
How FastAPI Executes Background Tasks
FastAPI background tasks are executed in the same process pool as your main application. When you add a task using the BackgroundTasks
object, it gets queued for execution after the response is sent to the client.
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as log_file:
log_file.write(message + "\n")
@app.post("/items/")
async def create_item(background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, "Item created")
return {"message": "Item created"}
In this example, the write_log
function runs after the response {"message": "Item created"}
is returned to the client.
Measuring Task Performance
Basic Timing Measurement
A simple way to measure task performance is by timing execution:
import time
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def process_data(data: dict):
start_time = time.time()
# Simulate processing
time.sleep(2)
execution_time = time.time() - start_time
with open("task_log.txt", "a") as log_file:
log_file.write(f"Task executed in {execution_time:.2f} seconds\n")
@app.post("/process/")
async def process_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(process_data, data)
return {"message": "Processing started"}
Advanced Performance Monitoring
For more detailed monitoring, you can use Python's cProfile
or external libraries:
import cProfile
import io
import pstats
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def profile_task(func, *args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
s = io.StringIO()
ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
ps.print_stats(10) # Print top 10 functions
with open("profile_log.txt", "a") as log_file:
log_file.write(s.getvalue() + "\n")
return result
def heavy_computation(data: dict):
# Your computationally intensive code here
result = 0
for i in range(1000000):
result += i
return result
@app.post("/analyze/")
async def analyze_data(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(profile_task, heavy_computation, data)
return {"message": "Analysis started"}
Optimizing Task Performance
1. Use Asynchronous Functions
Converting synchronous tasks to asynchronous can significantly improve performance:
import asyncio
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
async def async_process_data(data: dict):
# Simulate async I/O operations
await asyncio.sleep(1) # Non-blocking sleep
# Process data
result = data.get("value", 0) * 2
# Save results
with open("results.txt", "a") as f:
f.write(f"Processed result: {result}\n")
@app.post("/process-async/")
async def process_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(async_process_data, data)
return {"message": "Async processing started"}
2. Batch Processing
Process multiple items in a single task rather than creating many small tasks:
from fastapi import FastAPI, BackgroundTasks
from typing import List
app = FastAPI()
async def process_batch(items: List[dict]):
results = []
for item in items:
# Process each item
result = item.get("value", 0) * 2
results.append(result)
# Save all results at once
with open("batch_results.txt", "a") as f:
f.write(f"Batch results: {results}\n")
@app.post("/process-batch/")
async def batch_process(background_tasks: BackgroundTasks, items: List[dict]):
background_tasks.add_task(process_batch, items)
return {"message": f"Processing batch of {len(items)} items"}
3. Use External Task Queues for Heavy Workloads
For resource-intensive tasks, consider using dedicated task queues like Celery:
from fastapi import FastAPI
from celery_config import process_data_task # Import your Celery task
app = FastAPI()
@app.post("/heavy-process/")
async def heavy_process_endpoint(data: dict):
# Send task to Celery
task = process_data_task.delay(data)
return {"message": "Processing started", "task_id": task.id}
Monitoring Task Health and Performance
Creating a Health Endpoint
Monitor the health and performance metrics of your background tasks:
import time
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
# Global variables for monitoring
task_metrics = {
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"avg_execution_time": 0,
"total_execution_time": 0
}
def update_metrics(execution_time, success=True):
task_metrics["total_tasks"] += 1
if success:
task_metrics["completed_tasks"] += 1
else:
task_metrics["failed_tasks"] += 1
task_metrics["total_execution_time"] += execution_time
task_metrics["avg_execution_time"] = (
task_metrics["total_execution_time"] / task_metrics["completed_tasks"]
if task_metrics["completed_tasks"] > 0 else 0
)
def tracked_background_task(data):
start_time = time.time()
success = True
try:
# Actual task work
time.sleep(1) # Simulate work
except Exception:
success = False
execution_time = time.time() - start_time
update_metrics(execution_time, success)
@app.post("/tracked-task/")
async def tracked_task_endpoint(background_tasks: BackgroundTasks, data: dict):
background_tasks.add_task(tracked_background_task, data)
return {"message": "Task started with tracking"}
@app.get("/task-metrics/")
async def get_metrics():
return task_metrics
Real-World Example: Image Processing Service
Let's build a more practical example - an image processing service that uses background tasks:
import time
import asyncio
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
from fastapi.responses import JSONResponse
import aiofiles
import uuid
from PIL import Image
import io
app = FastAPI()
# Simulated image processing
async def process_image(image_path: str, processing_type: str):
start_time = time.time()
# Simulate different processing times based on complexity
if processing_type == "thumbnail":
await asyncio.sleep(1) # Simpler task
elif processing_type == "filter":
await asyncio.sleep(2) # Medium complexity
else:
await asyncio.sleep(3) # Complex task
# In a real application, you would use PIL or another library
# to actually process the image
# Log performance
execution_time = time.time() - start_time
async with aiofiles.open("image_processing_log.txt", "a") as log:
await log.write(f"Processed {image_path} with {processing_type} in {execution_time:.2f}s\n")
return f"processed_{image_path}"
@app.post("/upload-image/")
async def upload_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
process_type: str = "thumbnail"
):
# Generate a unique filename
file_extension = file.filename.split(".")[-1]
file_name = f"{uuid.uuid4()}.{file_extension}"
file_path = f"uploads/{file_name}"
# Save the uploaded file
async with aiofiles.open(file_path, "wb") as out_file:
content = await file.read()
await out_file.write(content)
# Add the processing task to background tasks
background_tasks.add_task(process_image, file_path, process_type)
return JSONResponse(
status_code=202,
content={
"message": "Image uploaded and processing started",
"file_name": file_name,
"process_type": process_type
}
)
Performance Tuning Tips for FastAPI Background Tasks
-
Limit Concurrent Tasks: Control the number of concurrent tasks to prevent overwhelming your server.
-
Monitor Memory Usage: Background tasks consume memory, so keep track of memory usage.
-
Separate CPU-bound and IO-bound Tasks: Handle them differently for optimal performance.
# For I/O bound tasks, using async is beneficial
async def io_bound_task():
await asyncio.sleep(1) # Simulating I/O operation
# For CPU bound tasks, consider running in a separate process
def cpu_bound_task():
# Heavy computation
result = sum(i * i for i in range(10**7))
return result
@app.post("/optimized-task/")
async def optimized_task(background_tasks: BackgroundTasks):
# I/O bound task can be run directly as background task
background_tasks.add_task(io_bound_task)
# For CPU bound task, use ProcessPoolExecutor
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as executor:
future = executor.submit(cpu_bound_task)
# You can store the future somewhere to check results later
return {"message": "Tasks scheduled optimally"}
- Use Task Prioritization: Implement a simple priority system for your tasks:
from fastapi import FastAPI, BackgroundTasks
from enum import Enum
import asyncio
import heapq
class Priority(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
app = FastAPI()
task_queue = [] # Priority queue for tasks
async def task_executor():
while True:
if task_queue:
priority, task_func, args = heapq.heappop(task_queue)
await task_func(*args)
else:
await asyncio.sleep(0.1)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(task_executor())
@app.post("/prioritized-task/")
async def add_prioritized_task(priority: Priority, task_data: dict):
async def sample_task(data):
await asyncio.sleep(1)
print(f"Executed task with data: {data}")
# Add task to priority queue
heapq.heappush(task_queue, (priority, sample_task, (task_data,)))
return {"message": f"Task added with priority {priority.name}"}
Summary
Optimizing background task performance in FastAPI applications involves several key strategies:
- Measure performance to identify bottlenecks
- Use asynchronous functions when appropriate
- Implement batch processing for multiple similar tasks
- Consider external task queues for heavy workloads
- Monitor task health through metrics
- Prioritize tasks based on importance
- Separate CPU-bound and IO-bound tasks for optimal resource utilization
By applying these techniques, you can ensure your FastAPI background tasks run efficiently even as your application scales.
Additional Resources
- FastAPI Official Documentation on Background Tasks
- AsyncIO Documentation
- Celery Documentation for Python
- Python cProfile Documentation
Exercises
- Implement a background task system that logs its execution time and memory usage.
- Create a priority-based task queue for handling background tasks with different urgency levels.
- Build a simple image processing API that uses background tasks and reports task performance metrics.
- Compare the performance of synchronous vs. asynchronous background tasks for both I/O bound and CPU bound operations.
- Implement a retry mechanism for failed background tasks with exponential backoff.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)