FastAPI Task Dependencies
Introduction
When building web applications with FastAPI, you'll often need to perform operations that don't need to happen during the request-response cycle. Background tasks are perfect for these scenarios, allowing your API to respond quickly while processing work asynchronously. Task dependencies take this concept further by providing a structured way to organize and inject dependencies into your background processing logic.
In this tutorial, we'll explore how to use FastAPI's dependency injection system with background tasks, creating cleaner, more maintainable, and testable asynchronous code.
Understanding Task Dependencies
Task dependencies allow you to:
- Isolate background processing logic
- Reuse common background processing patterns
- Test background tasks more effectively
- Inject configuration and services into background tasks
Let's start by understanding the basic concepts before diving into more complex examples.
Basic Background Tasks Review
Before we explore dependencies, let's quickly review how basic background tasks work in FastAPI:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_notification(email: str, message: str):
with open("notifications.txt", "a") as file:
file.write(f"Notification for {email}: {message}\n")
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, "Hello World!")
return {"message": "Notification sent in the background"}
This works, but as your application grows, you might want more structure around background tasks.
Creating Task Dependencies
Let's see how we can use FastAPI's dependency injection system for background tasks:
1. Basic Task Dependency
from fastapi import FastAPI, BackgroundTasks, Depends
from typing import Callable
app = FastAPI()
# A dependency that returns a task function
def get_notification_task() -> Callable:
def write_notification(email: str, message: str):
with open("notifications.txt", "a") as file:
file.write(f"Notification for {email}: {message}\n")
return write_notification
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
notification_task: Callable = Depends(get_notification_task)
):
background_tasks.add_task(notification_task, email, "Hello World!")
return {"message": "Notification sent in the background"}
In this example, we've created a dependency get_notification_task
that provides the task function. This allows for easier testing and better separation of concerns.
2. Configurable Task Dependencies
Let's make our task dependencies more flexible:
from fastapi import FastAPI, BackgroundTasks, Depends
from typing import Callable
import logging
app = FastAPI()
# Creating a configurable logger dependency
def get_logger():
logger = logging.getLogger("background_tasks")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.FileHandler("background_tasks.log")
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
return logger
# Task dependency that uses the logger dependency
def get_notification_task(logger = Depends(get_logger)) -> Callable:
def write_notification(email: str, message: str):
logger.info(f"Sending notification to {email}")
with open("notifications.txt", "a") as file:
file.write(f"Notification for {email}: {message}\n")
logger.info(f"Notification sent to {email}")
return write_notification
@app.post("/send-notification/{email}")
async def send_notification(
email: str,
background_tasks: BackgroundTasks,
notification_task: Callable = Depends(get_notification_task)
):
background_tasks.add_task(notification_task, email, "Hello World!")
return {"message": "Notification sent in the background"}
Now our get_notification_task
dependency depends on the get_logger
dependency, showing how dependencies can be chained together.
Real-World Example: Email Service with Task Dependencies
Let's build a more comprehensive example that demonstrates the power of task dependencies in a real-world scenario:
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from pydantic import BaseModel, EmailStr
from typing import Callable, List, Dict
import smtplib
from email.message import EmailMessage
import os
from functools import lru_cache
app = FastAPI()
class EmailContent(BaseModel):
subject: str
body: str
recipients: List[EmailStr]
class EmailConfig:
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
self.sender_email = os.getenv("SENDER_EMAIL", "[email protected]")
self.sender_password = os.getenv("SENDER_PASSWORD", "your-app-password")
@lru_cache()
def get_email_config() -> EmailConfig:
return EmailConfig()
def get_email_service(config: EmailConfig = Depends(get_email_config)) -> Callable:
def send_email(content: EmailContent):
msg = EmailMessage()
msg["Subject"] = content.subject
msg["From"] = config.sender_email
msg["To"] = ", ".join(content.recipients)
msg.set_content(content.body)
try:
with smtplib.SMTP(config.smtp_server, config.smtp_port) as server:
server.starttls()
server.login(config.sender_email, config.sender_password)
server.send_message(msg)
print(f"Email sent to {content.recipients}")
except Exception as e:
print(f"Failed to send email: {str(e)}")
# In a real app, you might log this or add to a retry queue
return send_email
@app.post("/send-email/")
async def send_email_endpoint(
email_content: EmailContent,
background_tasks: BackgroundTasks,
email_service: Callable = Depends(get_email_service)
):
# Adding the email sending task to background tasks
background_tasks.add_task(email_service, email_content)
return {
"status": "success",
"message": f"Email will be sent to {len(email_content.recipients)} recipients in the background"
}
In this example:
- We define a
EmailConfig
class that loads settings from environment variables - We create a cached dependency
get_email_config
that provides the email configuration - The
get_email_service
dependency uses the config to create an email sending function - Our endpoint uses the
email_service
as a dependency and adds it as a background task
This structure gives us several benefits:
- Configuration is centralized and cached
- Email sending logic is isolated and testable
- The endpoint remains clean and focused
- We can swap implementations of email sending for testing
Advanced: Task Dependencies with Database Access
Let's explore how to use task dependencies with database access:
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from typing import Callable
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
# Database setup
DATABASE_URL = "sqlite:///./test.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class TaskLog(Base):
__tablename__ = "task_logs"
id = sa.Column(sa.Integer, primary_key=True, index=True)
task_name = sa.Column(sa.String, index=True)
status = sa.Column(sa.String, index=True)
created_at = sa.Column(sa.DateTime, default=datetime.utcnow)
completed_at = sa.Column(sa.DateTime, nullable=True)
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI()
# Database dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Task tracking dependency
def get_task_tracker(db: Session = Depends(get_db)) -> Callable:
def track_task(task_name: str):
# Create a log entry when the task starts
log_entry = TaskLog(task_name=task_name, status="started")
db.add(log_entry)
db.commit()
db.refresh(log_entry)
try:
# Perform the actual task
print(f"Performing task: {task_name}")
# Update the log entry when the task completes
log_entry.status = "completed"
log_entry.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
# Update the log entry if the task fails
log_entry.status = "failed"
db.commit()
print(f"Task failed: {str(e)}")
return track_task
@app.post("/run-task/{task_name}")
async def run_task(
task_name: str,
background_tasks: BackgroundTasks,
task_tracker: Callable = Depends(get_task_tracker)
):
background_tasks.add_task(task_tracker, task_name)
return {"message": f"Task '{task_name}' scheduled"}
@app.get("/task-logs/")
async def get_task_logs(db: Session = Depends(get_db)):
logs = db.query(TaskLog).all()
return logs
In this example, we've created:
- A SQLAlchemy database model for tracking task execution
- A database dependency that manages the session lifecycle
- A task tracker dependency that logs task execution in the database
- Endpoints to trigger tasks and view task logs
This pattern allows us to track background task execution with proper database session management, which is crucial for long-running tasks.
Best Practices for Task Dependencies
- Keep dependencies focused: Each dependency should have a single responsibility.
- Use layered dependencies: Build complex dependencies by composing simpler ones.
- Handle exceptions: Always handle exceptions in background tasks to prevent silent failures.
- Consider task queues: For more complex needs, consider transitioning to dedicated task queues like Celery or RQ.
- Testing: Write tests for your task dependencies in isolation.
- Resource management: Be careful with database connections and other resources in long-running tasks.
Testing Task Dependencies
Here's how you might test the task dependencies we've created:
# test_tasks.py
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from main import app, get_notification_task
client = TestClient(app)
def test_notification_task():
# Create a mock logger
mock_logger = MagicMock()
# Get the task function using our dependency
task_func = get_notification_task(mock_logger)
# Call the task function
task_func("[email protected]", "Test message")
# Assert logger was called correctly
mock_logger.info.assert_any_call("Sending notification to [email protected]")
mock_logger.info.assert_any_call("Notification sent to [email protected]")
def test_send_notification_endpoint():
# Mock the actual task function
with patch("main.get_notification_task") as mock_get_task:
mock_task = MagicMock()
mock_get_task.return_value = mock_task
# Call the endpoint
response = client.post("/send-notification/[email protected]")
# Check response
assert response.status_code == 200
assert response.json() == {"message": "Notification sent in the background"}
# Verify the task was added to background tasks
# Note: This is tricky to test since BackgroundTasks is managed by FastAPI
# In a real test, you might need to mock BackgroundTasks or use integration tests
Summary
FastAPI's task dependencies provide a powerful way to structure and organize background processing in your applications. By leveraging the dependency injection system, you can:
- Create reusable, testable background task components
- Inject configuration and services into background tasks
- Chain dependencies to build complex processing pipelines
- Properly manage resources like database connections
When your application grows beyond simple background tasks, consider moving to a dedicated task queue solution like Celery or RQ for more robust background processing.
Additional Resources
- FastAPI Official Documentation on Background Tasks
- FastAPI Dependencies Documentation
- Celery - Distributed Task Queue
- Redis Queue (RQ)
Exercises
- Create a task dependency that limits the rate of background tasks using a simple token bucket algorithm.
- Modify the email service example to use a retry mechanism for failed email sending attempts.
- Build a file processing system using task dependencies that can handle uploaded files in the background.
- Create a notification system that can send notifications through multiple channels (email, SMS, push) using composable task dependencies.
- Implement a progress tracking system for long-running background tasks that users can query for updates.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)