FastAPI Long-running Tasks
Introduction
When building web applications with FastAPI, you'll often encounter situations where certain operations take a significant amount of time to complete. These might include:
- Processing large files
- Sending emails or notifications
- Running complex database operations
- Making multiple external API calls
- Generating reports
Such tasks can block your application's request-response cycle, leading to poor user experience if handled synchronously. In this tutorial, we'll learn how to manage long-running tasks in FastAPI applications without keeping users waiting.
Understanding the Problem
Let's start with a simple example that demonstrates the issue:
from fastapi import FastAPI
import time
app = FastAPI()
@app.get("/process-data")
def process_data():
# Simulate a long-running task
time.sleep(10)
return {"message": "Data processed successfully"}
In this code, when a user makes a request to /process-data
, they must wait a full 10 seconds before receiving a response. During this time:
- The user's browser appears to be loading
- The API can't handle other requests from the same user
- Server resources are tied up
Let's explore how to solve this problem using several techniques in FastAPI.
Solution 1: FastAPI Built-in Background Tasks
For simpler use cases, FastAPI provides a built-in BackgroundTasks
class that allows you to run functions in the background after returning a response.
Basic Background Task Example
from fastapi import FastAPI, BackgroundTasks
import time
app = FastAPI()
def long_running_task(message: str):
# Simulate a time-consuming process
time.sleep(10)
print(f"Task completed with message: {message}")
@app.get("/run-background-task")
async def run_background_task(background_tasks: BackgroundTasks):
# Add the task to the background tasks
background_tasks.add_task(long_running_task, "Hello from background task")
# Return immediately while the task runs in the background
return {"message": "Task started in the background"}
When you call this endpoint, you'll receive an immediate response, while the long_running_task
function continues to execute in the background.
When to Use Built-in Background Tasks
FastAPI's built-in BackgroundTasks
is suitable for:
- Simple, short-lived background operations
- Tasks that don't need to be monitored or have their results retrieved later
- When you want a lightweight solution without additional dependencies
However, this approach has limitations:
- Tasks run in the same process as your web server
- If your server restarts, in-progress tasks are lost
- Not suitable for resource-intensive operations that might impact web server performance
Solution 2: Using Celery for Advanced Task Processing
For more complex scenarios, Celery is a robust distributed task queue that integrates well with FastAPI.
Setting Up Celery with FastAPI
First, install Celery and a message broker (we'll use Redis):
pip install celery redis
Create a project structure:
my_fastapi_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ └── celery_worker.py
└── requirements.txt
In celery_worker.py
:
from celery import Celery
# Initialize Celery with Redis as the broker
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_large_dataset(dataset_id: int):
# Simulate processing a large dataset
import time
time.sleep(15)
return {"status": "completed", "dataset_id": dataset_id, "results": "Data processed successfully"}
In main.py
:
from fastapi import FastAPI
from .celery_worker import process_large_dataset
app = FastAPI()
@app.post("/process-dataset/{dataset_id}")
async def start_dataset_processing(dataset_id: int):
# Dispatch the task to Celery
task = process_large_dataset.delay(dataset_id)
return {
"task_id": task.id,
"status": "processing",
"message": f"Dataset {dataset_id} processing started"
}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
# Check the task status
task = process_large_dataset.AsyncResult(task_id)
if task.ready():
return {
"task_id": task_id,
"status": "completed",
"result": task.result
}
else:
return {
"task_id": task_id,
"status": "processing"
}
To run this system:
- Start Redis:
redis-server
- Start Celery worker:
celery -A app.celery_worker worker --loglevel=info
- Start FastAPI:
uvicorn app.main:app --reload
Benefits of Using Celery
- Distributed processing across multiple workers
- Task persistence (if the server restarts, tasks can continue)
- Task scheduling, retries, and monitoring
- Resource isolation between web and worker processes
- Scaling worker processes independently from web servers
Solution 3: FastAPI with Redis Queue (RQ)
If Celery seems too complex, RQ (Redis Queue) offers a simpler alternative with Redis.
Install the requirements:
pip install rq fastapi redis
Implementation example:
from fastapi import FastAPI
import redis
from rq import Queue
import time
app = FastAPI()
# Connect to Redis
redis_conn = redis.Redis()
# Create an RQ queue
queue = Queue(connection=redis_conn)
def generate_report(report_id: str):
# Simulate complex report generation
time.sleep(20)
return f"Report {report_id} data generated successfully"
@app.post("/generate-report/{report_id}")
async def start_report_generation(report_id: str):
# Enqueue the job
job = queue.enqueue(generate_report, report_id)
return {
"job_id": job.id,
"status": "queued",
"message": f"Report {report_id} generation started"
}
@app.get("/report-status/{job_id}")
async def check_report_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
return {"error": "Job not found"}
if job.is_finished:
return {"status": "completed", "result": job.result}
elif job.is_failed:
return {"status": "failed", "error": str(job.exc_info)}
else:
return {"status": "in-progress"}
To run this system:
- Start Redis server
- Start RQ worker:
rq worker
- Run your FastAPI application:
uvicorn main:app --reload
Solution 4: Using FastAPI with APScheduler
For scheduled or recurring tasks, APScheduler works well with FastAPI:
pip install apscheduler
Implementation:
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
app = FastAPI()
scheduler = BackgroundScheduler()
def daily_cleanup_task():
print(f"Running daily cleanup at {datetime.now()}")
# Simulate cleanup process
time.sleep(5)
print("Cleanup completed")
# Start the scheduler when the application starts
@app.on_event("startup")
async def startup_event():
scheduler.add_job(daily_cleanup_task, 'interval', hours=24)
scheduler.start()
# Shutdown the scheduler when the application stops
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()
@app.post("/schedule-one-time-task")
async def schedule_task():
# Schedule a task to run once after 1 minute
job = scheduler.add_job(
daily_cleanup_task,
'date',
run_date=datetime.now().timestamp() + 60
)
return {"job_id": job.id, "status": "scheduled"}
Real-world Example: PDF Report Generation
Let's put everything together in a real-world example. Imagine we're building an API that generates PDF reports from data analytics:
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from celery import Celery
import time
import os
# Initialize FastAPI
app = FastAPI()
# Initialize Celery
celery_app = Celery(
"report_generator",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
class ReportRequest(BaseModel):
user_id: int
report_type: str
data_range: str
email: str
# Simulate PDF generation and sending
@celery_app.task
def generate_and_send_report(request_data):
# Extract request data
user_id = request_data["user_id"]
report_type = request_data["report_type"]
data_range = request_data["data_range"]
email = request_data["email"]
print(f"Generating {report_type} report for user {user_id}...")
# Simulate database query and data processing
time.sleep(5)
# Simulate PDF generation
print(f"Creating PDF for {data_range} data...")
time.sleep(10)
# Simulate sending email
print(f"Sending report to {email}...")
time.sleep(3)
return {
"status": "completed",
"report_file": f"report_{user_id}_{report_type}.pdf",
"recipient": email
}
@app.post("/generate-report")
async def request_report_generation(request: ReportRequest):
# Convert Pydantic model to dict for Celery
request_dict = request.model_dump()
# Submit the task to Celery
task = generate_and_send_report.delay(request_dict)
return {
"task_id": task.id,
"status": "processing",
"message": f"Your {request.report_type} report is being generated and will be sent to {request.email}"
}
@app.get("/report-status/{task_id}")
async def get_report_status(task_id: str):
task_result = generate_and_send_report.AsyncResult(task_id)
if task_result.state == 'PENDING':
response = {
"status": "pending",
"message": "Report generation is pending"
}
elif task_result.state == 'FAILURE':
response = {
"status": "failed",
"message": "Report generation failed"
}
elif task_result.state == 'SUCCESS':
response = {
"status": "completed",
"result": task_result.result
}
else:
response = {
"status": "processing",
"message": "Report is being generated"
}
return response
In this example:
- Users request report generation via the
/generate-report
endpoint - The request is passed to Celery for background processing
- Users get an immediate response with a task ID
- They can check the report status with the task ID
- The report is generated and sent via email in the background
Best Practices for Long-running Tasks
- Always provide feedback: Give users a way to check task status.
- Implement timeouts: Set reasonable timeouts for background tasks to prevent hung processes.
- Add error handling: Implement robust error handling and retries for failed tasks.
- Consider task priority: Some task systems allow setting priorities for different types of tasks.
- Monitor resource usage: Watch CPU, memory, and disk space used by background workers.
- Implement logging: Comprehensive logging helps debug issues in background tasks.
- Use task idempotency: Design tasks to be safely retriable without side effects.
Summary
In this tutorial, we've explored several approaches to handling long-running tasks in FastAPI applications:
- FastAPI's built-in BackgroundTasks: Simple and lightweight, suitable for short tasks.
- Celery: Robust and scalable, ideal for complex distributed task processing.
- Redis Queue (RQ): Simpler alternative to Celery with Redis backend.
- APScheduler: Great for scheduled and recurring tasks.
Each approach has its own strengths and is suitable for different use cases. By implementing these patterns, you can build responsive FastAPI applications that handle time-consuming operations efficiently without keeping users waiting.
Additional Resources
- FastAPI Official Documentation on Background Tasks
- Celery Documentation
- Redis Queue Documentation
- APScheduler Documentation
Exercises
- Implement a FastAPI endpoint that processes an uploaded image in the background (resizing, applying filters).
- Create a scheduled task that cleans up temporary files daily.
- Build a small FastAPI application with Celery that sends a welcome email when a user signs up.
- Implement a progress tracking system for long-running tasks (e.g., 0-100% completion).
- Create a queue priority system where premium users' tasks get processed before free users' tasks.
By mastering these techniques, you'll be able to build highly responsive and scalable FastAPI applications capable of handling complex, time-consuming operations efficiently.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)