FastAPI Task Patterns
Introduction
When building web applications with FastAPI, you'll often encounter scenarios where certain operations shouldn't block the main request-response cycle. FastAPI's background tasks feature provides a solution, but knowing common patterns and best practices can dramatically improve your application's performance and reliability.
In this tutorial, we'll explore various patterns for implementing background tasks in FastAPI applications. We'll cover simple tasks, task dependencies, error handling, and more complex scenarios like chaining tasks and handling long-running processes.
Basic Background Task Pattern
The simplest pattern is executing a task after returning a response to the client.
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as log_file:
log_file.write(f"{message}\n")
@app.post("/items/")
async def create_item(item: dict, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Item created: {item['name']}")
return {"message": "Item created"}
In this example, the response is sent immediately while the logging happens in the background. The client doesn't need to wait for the log to be written.
Task With Dependencies Pattern
You can integrate background tasks with FastAPI's dependency injection system.
from fastapi import FastAPI, BackgroundTasks, Depends
from typing import Annotated
app = FastAPI()
class NotificationService:
async def send_notification(self, email: str, message: str):
# Simulating sending an email
print(f"Sending email to {email}: {message}")
# In a real app, you would use an email service here
def get_notification_service():
return NotificationService()
@app.post("/subscribe/")
async def subscribe(
email: str,
background_tasks: BackgroundTasks,
notification_service: Annotated[NotificationService, Depends(get_notification_service)]
):
background_tasks.add_task(
notification_service.send_notification,
email,
"Thanks for subscribing to our newsletter!"
)
return {"message": "Subscription successful"}
This pattern allows you to inject services that your background tasks depend on, making your code more modular and testable.
Error Handling Pattern
Background tasks run after the response is sent, so errors won't affect the response. However, you need a way to handle errors that occur during task execution.
import logging
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
logger = logging.getLogger("app")
def process_with_error_handling(item_id: int):
try:
# Simulate processing
if item_id % 2 == 0:
raise ValueError(f"Cannot process even item_id: {item_id}")
# Normal processing
print(f"Processing item {item_id}")
except Exception as e:
logger.error(f"Background task error: {str(e)}")
# You might want to store the error in a database or notify someone
@app.post("/process/{item_id}")
def process_item(item_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(process_with_error_handling, item_id)
return {"message": f"Processing of item {item_id} started"}
Always wrap your background task logic in try-except blocks to prevent crashes and to log errors properly.
Task Queue Pattern
For more complex scenarios, you can use task queuing with libraries like Celery, Redis Queue (RQ), or just Python's built-in asyncio.
import asyncio
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
task_queue = asyncio.Queue()
# This would run in a separate process or thread in a real app
async def process_queue():
while True:
task_func, args = await task_queue.get()
try:
await task_func(*args)
except Exception as e:
print(f"Error processing task: {e}")
finally:
task_queue.task_done()
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_queue())
async def complex_processing(user_id: int, data: dict):
# Simulate complex processing
await asyncio.sleep(10) # Long operation
print(f"Processed data for user {user_id}: {data}")
@app.post("/process-data/")
async def process_data(user_id: int, data: dict):
# Add to queue instead of directly as a background task
await task_queue.put((complex_processing, (user_id, data)))
return {"message": "Data queued for processing"}
This pattern is useful when you need more control over task execution, such as limiting concurrent tasks or prioritizing certain tasks.
Task Progress Tracking Pattern
For long-running tasks, users often want to know the progress. You can implement a pattern to track and report task progress.
import asyncio
import uuid
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# In-memory storage for task status (use Redis or a database in production)
task_status = {}
async def process_with_progress(task_id: str, steps: int):
try:
for i in range(steps):
# Update progress
progress_percent = int((i / steps) * 100)
task_status[task_id] = {"status": "processing", "progress": progress_percent}
# Simulate work
await asyncio.sleep(1)
# Mark as complete
task_status[task_id] = {"status": "completed", "progress": 100}
except Exception as e:
task_status[task_id] = {"status": "failed", "error": str(e)}
@app.post("/start-task/")
async def start_task(steps: int, background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
task_status[task_id] = {"status": "started", "progress": 0}
background_tasks.add_task(process_with_progress, task_id, steps)
return {"task_id": task_id, "message": "Task started"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
if task_id not in task_status:
return {"error": "Task not found"}
return task_status[task_id]
This pattern enables you to start a task and then check its progress through a separate endpoint.
Chained Tasks Pattern
Sometimes you need to execute a series of background tasks where one depends on the previous one.
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
async def process_image(image_id: int):
print(f"Processing image {image_id}...")
# Simulate image processing
return f"processed_image_{image_id}.jpg"
async def create_thumbnail(processed_image_path: str):
print(f"Creating thumbnail for {processed_image_path}...")
# Simulate thumbnail creation
return f"thumb_{processed_image_path}"
async def update_database(image_id: int, thumbnail_path: str):
print(f"Updating database: image_id={image_id}, thumbnail={thumbnail_path}")
# Simulate database update
async def process_image_pipeline(image_id: int):
# Chain of operations
processed_image = await process_image(image_id)
thumbnail_path = await create_thumbnail(processed_image)
await update_database(image_id, thumbnail_path)
@app.post("/upload-image/{image_id}")
async def upload_image(image_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(process_image_pipeline, image_id)
return {"message": f"Image {image_id} uploaded and processing started"}
This pattern allows you to create a sequence of operations that run in the background while keeping the code organized.
Periodic Tasks Pattern
Some tasks need to run periodically. While FastAPI doesn't have built-in scheduling, you can integrate with libraries like APScheduler.
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import asyncio
app = FastAPI()
scheduler = AsyncIOScheduler()
async def cleanup_old_files():
print("Cleaning up old files...")
# Simulate cleanup operation
await asyncio.sleep(2)
print("Cleanup completed")
async def generate_daily_report():
print("Generating daily report...")
# Simulate report generation
await asyncio.sleep(5)
print("Report generated")
@app.on_event("startup")
def startup_scheduler():
# Run cleanup every hour
scheduler.add_job(cleanup_old_files, CronTrigger(hour="*"))
# Run report generation daily at midnight
scheduler.add_job(generate_daily_report, CronTrigger(hour=0, minute=0))
scheduler.start()
@app.on_event("shutdown")
def shutdown_scheduler():
scheduler.shutdown()
@app.get("/")
async def root():
return {"message": "Scheduler is running"}
This pattern is useful for maintenance tasks, data aggregation, or any regular operation that doesn't need to be triggered by a user request.
Summary
We've explored several useful patterns for implementing background tasks in FastAPI:
- Basic Background Task Pattern - Simple non-blocking operations
- Task With Dependencies Pattern - Using dependency injection with tasks
- Error Handling Pattern - Properly handling exceptions in background tasks
- Task Queue Pattern - Managing multiple tasks with a queue
- Task Progress Tracking Pattern - Tracking and reporting task status
- Chained Tasks Pattern - Creating sequences of dependent operations
- Periodic Tasks Pattern - Scheduling recurring tasks
By applying these patterns, you can create more responsive, resilient, and maintainable FastAPI applications that handle complex operations without blocking the main request-response cycle.
Additional Resources
- FastAPI Official Documentation on Background Tasks
- APScheduler Documentation
- Celery Project - For more complex task queuing
Exercises
- Implement a background task that processes an uploaded image and sends an email when it's done.
- Create a task progress tracking system that stores progress in Redis instead of in memory.
- Implement a batch processing endpoint that queues multiple tasks and returns a batch ID for tracking all tasks.
- Build a rate-limited background task system that only allows a certain number of tasks to run concurrently.
- Create a priority queue for background tasks where some tasks can jump ahead of others in the queue.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)