FastAPI Task Parameters
In your FastAPI applications, background tasks allow you to perform operations asynchronously after returning a response to the client. One powerful feature of FastAPI's background tasks system is the ability to pass parameters to these tasks. This enables more flexible and reusable background operations.
Introduction to Task Parameters
When working with background tasks in FastAPI, you often need to customize the task's behavior based on request data or other contextual information. Task parameters let you:
- Pass data from your route handlers to background tasks
- Make your background task functions reusable across different routes
- Customize task behavior without creating multiple similar functions
Let's explore how to effectively work with task parameters in FastAPI.
Basic Parameter Passing
At its core, passing parameters to background tasks is straightforward. You can provide any arguments to your task function when adding it to the BackgroundTasks
object.
Example 1: Simple Parameter Passing
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def process_item(item_id: int, action: str):
print(f"Processing item {item_id} with action: {action}")
# Perform some background processing
@app.post("/items/{item_id}")
async def create_item(item_id: int, background_tasks: BackgroundTasks):
# Add the background task with parameters
background_tasks.add_task(process_item, item_id, "create")
return {"message": "Item creation initiated"}
In this example, we're passing two parameters to our process_item
background task:
- The
item_id
from the path parameter - A static string
"create"
indicating the action type
Multiple Parameter Types
You can pass various types of parameters to your background tasks:
Example 2: Using Different Parameter Types
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, List
import time
app = FastAPI()
class Item(BaseModel):
name: str
description: str = None
price: float
tags: List[str] = []
def process_complex_task(
item: Item,
user_id: int,
metadata: Dict[str, str],
importance: int = 1
):
print(f"Processing {item.name} for user {user_id}")
print(f"Metadata: {metadata}")
print(f"Importance level: {importance}")
# Simulate work based on importance
time.sleep(importance * 2)
# Processing logic here...
@app.post("/process/")
async def process_request(
item: Item,
user_id: int,
background_tasks: BackgroundTasks,
priority: int = 1
):
# Create some metadata
metadata = {
"timestamp": str(time.time()),
"source": "api_request"
}
# Add background task with multiple parameter types
background_tasks.add_task(
process_complex_task,
item=item, # Pydantic model
user_id=user_id, # Integer
metadata=metadata, # Dictionary
importance=priority # Optional parameter with default
)
return {"status": "processing", "priority": priority}
In this example, we're passing:
- A Pydantic model (
Item
) - A primitive type (
user_id
as an integer) - A dictionary (
metadata
) - An optional parameter with a default value (
importance
)
Accessing Request Information in Tasks
Often, you need to access information from the request context in your background tasks.
Example 3: Passing Request Information to Tasks
from fastapi import FastAPI, BackgroundTasks, Request, Depends, Header
from typing import Optional
app = FastAPI()
async def log_request(
path: str,
query_params: dict,
user_agent: str,
client_ip: str
):
# In a real application, you might write this to a database or log file
print(f"Request log - Path: {path}")
print(f"Query parameters: {query_params}")
print(f"User agent: {user_agent}")
print(f"Client IP: {client_ip}")
@app.get("/logged-endpoint")
async def logged_endpoint(
request: Request,
background_tasks: BackgroundTasks,
user_agent: Optional[str] = Header(None)
):
# Extract information from the request
path = request.url.path
query_params = dict(request.query_params)
client_ip = request.client.host
# Add the background task with request information
background_tasks.add_task(
log_request,
path=path,
query_params=query_params,
user_agent=user_agent,
client_ip=client_ip
)
return {"message": "This request will be logged"}
This example demonstrates how to:
- Extract information from the request object
- Pass that information to a background task
- Process the information asynchronously after the response is sent
Working with Database Dependencies
A common use case involves passing database connections or repositories to background tasks.
Example 4: Passing Database Dependencies to Tasks
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from typing import List
from .database import get_db
from .models import User, Notification
app = FastAPI()
async def send_notifications(
user_ids: List[int],
message: str,
db: Session
):
"""Send notifications to multiple users in the background"""
for user_id in user_ids:
user = db.query(User).filter(User.id == user_id).first()
if user:
# Create notification record
notification = Notification(
user_id=user_id,
message=message,
is_read=False
)
db.add(notification)
# In a real app, you might also send an email, push notification, etc.
print(f"Notification sent to user {user_id}: {message}")
db.commit()
@app.post("/broadcast/")
async def broadcast_message(
message: str,
user_ids: List[int],
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
# Add background task with database session
background_tasks.add_task(send_notifications, user_ids, message, db)
return {
"message": "Broadcasting initiated",
"recipients": len(user_ids)
}
In this example:
- We use FastAPI's dependency injection to get a database session
- Pass that session to our background task
- The task uses the session to perform database operations asynchronously
Important: When passing database connections to background tasks, ensure your connection pooling is configured correctly to prevent connection leaks. The database session should be properly closed after the background task is completed.
Dynamic Task Generation Based on Parameters
You can also use parameters to dynamically determine which background tasks to run.
Example 5: Dynamic Task Selection
from fastapi import FastAPI, BackgroundTasks, HTTPException
from enum import Enum
from typing import Dict, Any
app = FastAPI()
class ProcessingType(str, Enum):
IMAGE = "image"
TEXT = "text"
VIDEO = "video"
# Different processing functions for different types of content
async def process_image(data: Dict[str, Any]):
print(f"Processing image: {data.get('filename')}")
# Image processing logic
async def process_text(data: Dict[str, Any]):
print(f"Processing text with {len(data.get('content', ''))} characters")
# Text processing logic
async def process_video(data: Dict[str, Any]):
print(f"Processing video: {data.get('filename')}")
# Video processing logic
# Task registry - maps processing types to their handler functions
task_handlers = {
ProcessingType.IMAGE: process_image,
ProcessingType.TEXT: process_text,
ProcessingType.VIDEO: process_video,
}
@app.post("/process/{processing_type}")
async def process_content(
processing_type: ProcessingType,
data: Dict[str, Any],
background_tasks: BackgroundTasks
):
# Get the appropriate handler based on the processing type
handler = task_handlers.get(processing_type)
if not handler:
raise HTTPException(status_code=400, detail="Unsupported processing type")
# Add the selected task handler with the provided data
background_tasks.add_task(handler, data)
return {
"status": "processing",
"type": processing_type,
"data_received": len(data)
}
This pattern allows you to:
- Define specialized task functions for different operations
- Select the appropriate task handler dynamically based on parameters
- Maintain a clean, extensible codebase as you add new processing types
Best Practices for Task Parameters
When working with task parameters in FastAPI, follow these best practices:
-
Parameter Immutability: Pass immutable data when possible, or create copies of mutable data to avoid race conditions.
-
Validate Early: Validate parameters in your endpoint before passing them to background tasks.
-
Default Values: Provide sensible defaults for optional parameters.
-
Type Hints: Always use proper type hints for better documentation and IDE support.
-
Error Handling: Include error handling within your tasks, as exceptions won't be caught by FastAPI's exception handlers.
Example: Following Best Practices
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, EmailStr
from typing import List, Optional
import copy
app = FastAPI()
class EmailContent(BaseModel):
subject: str
body: str
class EmailTask(BaseModel):
recipients: List[EmailStr]
content: EmailContent
priority: int = 1
retry_count: int = 3
async def send_email_task(
task_data: EmailTask,
log_success: bool = True
):
# Create a copy to avoid potential mutations elsewhere
task = copy.deepcopy(task_data)
try:
# Simulate email sending with retry logic
for attempt in range(1, task.retry_count + 1):
try:
print(f"Sending email to {len(task.recipients)} recipients:")
print(f"Subject: {task.content.subject}")
print(f"Attempt {attempt} of {task.retry_count}")
# Simulate actual email sending here
# ...
if log_success:
print("Email sent successfully")
break
except Exception as e:
print(f"Attempt {attempt} failed: {str(e)}")
if attempt == task.retry_count:
raise
except Exception as e:
# Log the error - in a real app, you might want to store this in a database
print(f"Failed to send email after {task.retry_count} attempts: {str(e)}")
@app.post("/send-email/")
async def send_email(
email_task: EmailTask,
background_tasks: BackgroundTasks,
log_success: bool = True
):
# Validate email task data
if not email_task.recipients:
raise HTTPException(status_code=400, detail="No recipients provided")
if len(email_task.content.body) < 10:
raise HTTPException(status_code=400, detail="Email body too short")
# Add validated task to background tasks
background_tasks.add_task(send_email_task, email_task, log_success)
return {
"status": "queued",
"recipients_count": len(email_task.recipients),
"priority": email_task.priority
}
This example showcases:
- Deep copying of parameters to prevent mutation issues
- Comprehensive error handling inside the task
- Early validation before scheduling the task
- Proper type hints throughout the code
- Default parameter values
Summary
FastAPI's ability to pass parameters to background tasks provides flexibility and power when designing asynchronous operations in your API. Through this feature, you can:
- Create reusable background task functions
- Dynamically control task behavior based on request data
- Pass complex objects, including database connections and Pydantic models
- Build more maintainable and organized background processing systems
By following the best practices and patterns shown in this guide, you can implement robust background task systems that efficiently handle asynchronous operations while keeping your code clean and maintainable.
Additional Resources and Exercises
Resources
- FastAPI official documentation on background tasks
- Python asyncio documentation
- Starlette background tasks implementation
Exercises
-
Email Notification System: Build a system that sends different types of email notifications as background tasks, with parameters for customizing the email templates and recipient lists.
-
Scheduled Report Generation: Create an endpoint that generates complex reports in the background, with parameters to control the report format, date range, and content sections.
-
Image Processing Pipeline: Implement a system that processes uploaded images in the background with parameters for controlling resizing, filtering, and storage options.
-
Error Recovery: Enhance one of the examples to include robust error handling with automatic retry logic when background tasks fail.
-
Task Progress Tracking: Extend a background task to update a database with progress information, which can be queried by another endpoint.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)