Skip to main content

FastAPI Long-running Tasks

Introduction

When building web applications with FastAPI, you'll often encounter situations where certain operations take a significant amount of time to complete. These might include:

  • Processing large files
  • Sending emails or notifications
  • Running complex database operations
  • Making multiple external API calls
  • Generating reports

Such tasks can block your application's request-response cycle, leading to poor user experience if handled synchronously. In this tutorial, we'll learn how to manage long-running tasks in FastAPI applications without keeping users waiting.

Understanding the Problem

Let's start with a simple example that demonstrates the issue:

python
from fastapi import FastAPI
import time

app = FastAPI()

@app.get("/process-data")
def process_data():
# Simulate a long-running task
time.sleep(10)
return {"message": "Data processed successfully"}

In this code, when a user makes a request to /process-data, they must wait a full 10 seconds before receiving a response. During this time:

  • The user's browser appears to be loading
  • The API can't handle other requests from the same user
  • Server resources are tied up

Let's explore how to solve this problem using several techniques in FastAPI.

Solution 1: FastAPI Built-in Background Tasks

For simpler use cases, FastAPI provides a built-in BackgroundTasks class that allows you to run functions in the background after returning a response.

Basic Background Task Example

python
from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

def long_running_task(message: str):
# Simulate a time-consuming process
time.sleep(10)
print(f"Task completed with message: {message}")

@app.get("/run-background-task")
async def run_background_task(background_tasks: BackgroundTasks):
# Add the task to the background tasks
background_tasks.add_task(long_running_task, "Hello from background task")

# Return immediately while the task runs in the background
return {"message": "Task started in the background"}

When you call this endpoint, you'll receive an immediate response, while the long_running_task function continues to execute in the background.

When to Use Built-in Background Tasks

FastAPI's built-in BackgroundTasks is suitable for:

  • Simple, short-lived background operations
  • Tasks that don't need to be monitored or have their results retrieved later
  • When you want a lightweight solution without additional dependencies

However, this approach has limitations:

  • Tasks run in the same process as your web server
  • If your server restarts, in-progress tasks are lost
  • Not suitable for resource-intensive operations that might impact web server performance

Solution 2: Using Celery for Advanced Task Processing

For more complex scenarios, Celery is a robust distributed task queue that integrates well with FastAPI.

Setting Up Celery with FastAPI

First, install Celery and a message broker (we'll use Redis):

bash
pip install celery redis

Create a project structure:

my_fastapi_app/
├── app/
│ ├── __init__.py
│ ├── main.py
│ └── celery_worker.py
└── requirements.txt

In celery_worker.py:

python
from celery import Celery

# Initialize Celery with Redis as the broker
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

@celery_app.task
def process_large_dataset(dataset_id: int):
# Simulate processing a large dataset
import time
time.sleep(15)
return {"status": "completed", "dataset_id": dataset_id, "results": "Data processed successfully"}

In main.py:

python
from fastapi import FastAPI
from .celery_worker import process_large_dataset

app = FastAPI()

@app.post("/process-dataset/{dataset_id}")
async def start_dataset_processing(dataset_id: int):
# Dispatch the task to Celery
task = process_large_dataset.delay(dataset_id)

return {
"task_id": task.id,
"status": "processing",
"message": f"Dataset {dataset_id} processing started"
}

@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
# Check the task status
task = process_large_dataset.AsyncResult(task_id)

if task.ready():
return {
"task_id": task_id,
"status": "completed",
"result": task.result
}
else:
return {
"task_id": task_id,
"status": "processing"
}

To run this system:

  1. Start Redis: redis-server
  2. Start Celery worker: celery -A app.celery_worker worker --loglevel=info
  3. Start FastAPI: uvicorn app.main:app --reload

Benefits of Using Celery

  • Distributed processing across multiple workers
  • Task persistence (if the server restarts, tasks can continue)
  • Task scheduling, retries, and monitoring
  • Resource isolation between web and worker processes
  • Scaling worker processes independently from web servers

Solution 3: FastAPI with Redis Queue (RQ)

If Celery seems too complex, RQ (Redis Queue) offers a simpler alternative with Redis.

Install the requirements:

bash
pip install rq fastapi redis

Implementation example:

python
from fastapi import FastAPI
import redis
from rq import Queue
import time

app = FastAPI()

# Connect to Redis
redis_conn = redis.Redis()
# Create an RQ queue
queue = Queue(connection=redis_conn)

def generate_report(report_id: str):
# Simulate complex report generation
time.sleep(20)
return f"Report {report_id} data generated successfully"

@app.post("/generate-report/{report_id}")
async def start_report_generation(report_id: str):
# Enqueue the job
job = queue.enqueue(generate_report, report_id)

return {
"job_id": job.id,
"status": "queued",
"message": f"Report {report_id} generation started"
}

@app.get("/report-status/{job_id}")
async def check_report_status(job_id: str):
job = queue.fetch_job(job_id)

if job is None:
return {"error": "Job not found"}

if job.is_finished:
return {"status": "completed", "result": job.result}
elif job.is_failed:
return {"status": "failed", "error": str(job.exc_info)}
else:
return {"status": "in-progress"}

To run this system:

  1. Start Redis server
  2. Start RQ worker: rq worker
  3. Run your FastAPI application: uvicorn main:app --reload

Solution 4: Using FastAPI with APScheduler

For scheduled or recurring tasks, APScheduler works well with FastAPI:

bash
pip install apscheduler

Implementation:

python
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time

app = FastAPI()
scheduler = BackgroundScheduler()

def daily_cleanup_task():
print(f"Running daily cleanup at {datetime.now()}")
# Simulate cleanup process
time.sleep(5)
print("Cleanup completed")

# Start the scheduler when the application starts
@app.on_event("startup")
async def startup_event():
scheduler.add_job(daily_cleanup_task, 'interval', hours=24)
scheduler.start()

# Shutdown the scheduler when the application stops
@app.on_event("shutdown")
async def shutdown_event():
scheduler.shutdown()

@app.post("/schedule-one-time-task")
async def schedule_task():
# Schedule a task to run once after 1 minute
job = scheduler.add_job(
daily_cleanup_task,
'date',
run_date=datetime.now().timestamp() + 60
)

return {"job_id": job.id, "status": "scheduled"}

Real-world Example: PDF Report Generation

Let's put everything together in a real-world example. Imagine we're building an API that generates PDF reports from data analytics:

python
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from celery import Celery
import time
import os

# Initialize FastAPI
app = FastAPI()

# Initialize Celery
celery_app = Celery(
"report_generator",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

class ReportRequest(BaseModel):
user_id: int
report_type: str
data_range: str
email: str

# Simulate PDF generation and sending
@celery_app.task
def generate_and_send_report(request_data):
# Extract request data
user_id = request_data["user_id"]
report_type = request_data["report_type"]
data_range = request_data["data_range"]
email = request_data["email"]

print(f"Generating {report_type} report for user {user_id}...")

# Simulate database query and data processing
time.sleep(5)

# Simulate PDF generation
print(f"Creating PDF for {data_range} data...")
time.sleep(10)

# Simulate sending email
print(f"Sending report to {email}...")
time.sleep(3)

return {
"status": "completed",
"report_file": f"report_{user_id}_{report_type}.pdf",
"recipient": email
}

@app.post("/generate-report")
async def request_report_generation(request: ReportRequest):
# Convert Pydantic model to dict for Celery
request_dict = request.model_dump()

# Submit the task to Celery
task = generate_and_send_report.delay(request_dict)

return {
"task_id": task.id,
"status": "processing",
"message": f"Your {request.report_type} report is being generated and will be sent to {request.email}"
}

@app.get("/report-status/{task_id}")
async def get_report_status(task_id: str):
task_result = generate_and_send_report.AsyncResult(task_id)

if task_result.state == 'PENDING':
response = {
"status": "pending",
"message": "Report generation is pending"
}
elif task_result.state == 'FAILURE':
response = {
"status": "failed",
"message": "Report generation failed"
}
elif task_result.state == 'SUCCESS':
response = {
"status": "completed",
"result": task_result.result
}
else:
response = {
"status": "processing",
"message": "Report is being generated"
}

return response

In this example:

  1. Users request report generation via the /generate-report endpoint
  2. The request is passed to Celery for background processing
  3. Users get an immediate response with a task ID
  4. They can check the report status with the task ID
  5. The report is generated and sent via email in the background

Best Practices for Long-running Tasks

  1. Always provide feedback: Give users a way to check task status.
  2. Implement timeouts: Set reasonable timeouts for background tasks to prevent hung processes.
  3. Add error handling: Implement robust error handling and retries for failed tasks.
  4. Consider task priority: Some task systems allow setting priorities for different types of tasks.
  5. Monitor resource usage: Watch CPU, memory, and disk space used by background workers.
  6. Implement logging: Comprehensive logging helps debug issues in background tasks.
  7. Use task idempotency: Design tasks to be safely retriable without side effects.

Summary

In this tutorial, we've explored several approaches to handling long-running tasks in FastAPI applications:

  1. FastAPI's built-in BackgroundTasks: Simple and lightweight, suitable for short tasks.
  2. Celery: Robust and scalable, ideal for complex distributed task processing.
  3. Redis Queue (RQ): Simpler alternative to Celery with Redis backend.
  4. APScheduler: Great for scheduled and recurring tasks.

Each approach has its own strengths and is suitable for different use cases. By implementing these patterns, you can build responsive FastAPI applications that handle time-consuming operations efficiently without keeping users waiting.

Additional Resources

Exercises

  1. Implement a FastAPI endpoint that processes an uploaded image in the background (resizing, applying filters).
  2. Create a scheduled task that cleans up temporary files daily.
  3. Build a small FastAPI application with Celery that sends a welcome email when a user signs up.
  4. Implement a progress tracking system for long-running tasks (e.g., 0-100% completion).
  5. Create a queue priority system where premium users' tasks get processed before free users' tasks.

By mastering these techniques, you'll be able to build highly responsive and scalable FastAPI applications capable of handling complex, time-consuming operations efficiently.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)