Skip to main content

FastAPI Celery Integration

Introduction

When building web applications with FastAPI, you may encounter scenarios where certain operations take too long to complete during an HTTP request lifecycle. These could be tasks like sending emails, processing large files, or performing complex computations. While FastAPI includes a built-in background tasks system, it's limited for more complex scenarios.

Celery is a distributed task queue that can handle these long-running processes outside of your web application's request-response cycle. In this tutorial, we'll learn how to integrate Celery with FastAPI to create a robust system for handling background tasks.

What is Celery?

Celery is an asynchronous task queue/job queue based on distributed message passing. It focuses on real-time operations but also supports scheduling. Celery is used to run tasks in the background, outside the HTTP request-response cycle.

Key components of a Celery system include:

  • Tasks: Python functions registered with Celery
  • Message Broker: Queue system that distributes tasks (like Redis or RabbitMQ)
  • Workers: Processes that execute the tasks
  • Result Backend: Optional storage for task results

Setting Up the Environment

Let's start by setting up our development environment.

Installation

First, we need to install the necessary packages:

bash
pip install fastapi uvicorn celery redis

We're using Redis as our message broker and result backend, but you could also use RabbitMQ or other supported brokers.

Project Structure

We'll set up our project with the following structure:

project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ └── tasks.py
├── requirements.txt
└── README.md

Basic Celery Configuration

Let's start by creating our Celery application in the celery_app.py file:

python
from celery import Celery

celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

# Optional configuration
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)

This creates a Celery application instance and configures it to use Redis as both the broker and the result backend.

Defining Celery Tasks

Now, let's define some tasks in the tasks.py file:

python
from app.celery_app import celery_app
import time

@celery_app.task
def add(x, y):
"""Add two numbers (slow operation for demo)"""
time.sleep(5) # Simulate a time-consuming task
return x + y

@celery_app.task(bind=True)
def process_file(self, file_path):
"""Process a file with progress tracking"""
total_steps = 10
for i in range(total_steps):
# Update task state to track progress
self.update_state(state='PROGRESS', meta={'current': i, 'total': total_steps})
time.sleep(1) # Simulate processing

return {"status": "completed", "file": file_path}

Integrating with FastAPI

Now, let's create our FastAPI application and integrate it with Celery in main.py:

python
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from celery.result import AsyncResult

from app.celery_app import celery_app
from app.tasks import add, process_file

app = FastAPI(title="FastAPI Celery Integration")

@app.get("/")
async def root():
return {"message": "FastAPI Celery Integration Example"}

@app.post("/tasks/add")
async def add_task(x: int, y: int):
"""Submit an addition task to Celery"""
task = add.delay(x, y)
return {"task_id": task.id, "message": "Task submitted to queue"}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get the status of a task"""
task_result = AsyncResult(task_id, app=celery_app)

result = {
"task_id": task_id,
"status": task_result.status,
}

if task_result.status == "SUCCESS":
result["result"] = task_result.get()
elif task_result.status == "PROGRESS":
result["progress"] = task_result.info

return result

@app.post("/tasks/process-file")
async def start_file_processing(file_path: str):
"""Start file processing task"""
task = process_file.delay(file_path)
return {"task_id": task.id, "message": "File processing started"}

Running the Application

To run this application, you'll need to start three components:

  1. The FastAPI server:
bash
uvicorn app.main:app --reload
  1. The Redis server (if not already running):
bash
redis-server
  1. The Celery worker:
bash
celery -A app.celery_app worker --loglevel=info

Example Workflow

Let's see how a full workflow would look like:

  1. Submit a task to add numbers:

    • Request: POST /tasks/add?x=5&y=3
    • Response:
      json
      {
      "task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
      "message": "Task submitted to queue"
      }
  2. Check task status:

    • Request: GET /tasks/7a5cb1ae-50e7-4f9c-8799-3a6d32488edb
    • Response (while processing):
      json
      {
      "task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
      "status": "PENDING"
      }
    • Response (after completion):
      json
      {
      "task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
      "status": "SUCCESS",
      "result": 8
      }
  3. Submit a file processing task:

    • Request: POST /tasks/process-file?file_path=/path/to/file.txt
    • Response:
      json
      {
      "task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
      "message": "File processing started"
      }
  4. Check file processing status:

    • Request: GET /tasks/e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a
    • Response (while processing):
      json
      {
      "task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
      "status": "PROGRESS",
      "progress": {
      "current": 4,
      "total": 10
      }
      }
    • Response (after completion):
      json
      {
      "task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
      "status": "SUCCESS",
      "result": {
      "status": "completed",
      "file": "/path/to/file.txt"
      }
      }

Advanced Features

Task Scheduling

Celery supports task scheduling. Let's add a scheduled task to our application:

python
# In celery_app.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'app.tasks.cleanup_temp_files',
'schedule': crontab(minute=0, hour='*'), # Run at the start of every hour
'args': (),
},
}

# In tasks.py
@celery_app.task
def cleanup_temp_files():
"""Clean up temporary files"""
# Implementation details
print("Cleaning up temporary files")
return {"status": "cleaned", "count": 5} # Example result

To run the scheduler, you'll need to start Celery Beat:

bash
celery -A app.celery_app beat --loglevel=info

Error Handling

It's important to handle errors in our Celery tasks:

python
@celery_app.task(bind=True, max_retries=3)
def send_email(self, to_email, subject, body):
"""Send an email with retry logic"""
try:
# Code to send email
print(f"Sending email to {to_email}")
# Simulate error sometimes
if "error" in to_email:
raise ValueError("Failed to send email")
return {"status": "sent", "to": to_email}
except Exception as exc:
# Retry the task with exponential backoff
retry_in = 5 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_in)

Extending the FastAPI Integration

Let's create a more sophisticated endpoint for email sending:

python
from pydantic import BaseModel, EmailStr

class EmailRequest(BaseModel):
to_email: EmailStr
subject: str
body: str

@app.post("/tasks/send-email")
async def send_email_task(email_req: EmailRequest):
"""Queue an email to be sent"""
task = send_email.delay(
email_req.to_email,
email_req.subject,
email_req.body
)
return {"task_id": task.id, "message": "Email queued for delivery"}

Real-world Example: Document Processing Service

Let's put everything together in a real-world example of a document processing service:

python
# app/tasks.py
import time
import os
from celery import states
from app.celery_app import celery_app

@celery_app.task(bind=True)
def process_document(self, document_id, options):
"""Process a document with various transformations"""
# Simulate a complex document processing pipeline
steps = [
"validating",
"parsing",
"transforming",
"optimizing",
"saving"
]

total_steps = len(steps)

for i, step in enumerate(steps):
# Update progress
self.update_state(
state=states.STARTED,
meta={
'current_step': i + 1,
'total_steps': total_steps,
'step_name': step,
'progress_percent': int((i + 0.5) * 100 / total_steps)
}
)

# Simulate processing time for this step
time.sleep(2)

# If this was a real processor, we might have actual work here
if step == "parsing" and options.get("strict", False):
# Simulate finding an error with strict parsing
if document_id.endswith("_bad"):
self.update_state(
state=states.FAILURE,
meta={'error': 'Document failed strict validation'}
)
return {'status': 'error', 'reason': 'Document failed strict validation'}

# Success!
result = {
'status': 'completed',
'document_id': document_id,
'pages': 5, # Example metadata about the processed document
'output_path': f"/processed/{document_id}.pdf"
}

return result
python
# app/main.py (additional endpoints)
from pydantic import BaseModel
from typing import Dict, Any, Optional

class DocumentProcessRequest(BaseModel):
document_id: str
options: Dict[str, Any] = {}
priority: Optional[str] = "normal" # Could be "low", "normal", "high"

@app.post("/documents/process")
async def process_document_endpoint(request: DocumentProcessRequest):
"""Submit a document for processing"""
# You could adjust task priority based on the request
queue = "high_priority" if request.priority == "high" else "default"

task = process_document.apply_async(
args=[request.document_id, request.options],
queue=queue
)

return {
"task_id": task.id,
"document_id": request.document_id,
"status": "submitted"
}

Best Practices

When integrating Celery with FastAPI, keep these best practices in mind:

  1. Keep tasks idempotent - A task should produce the same result even if run multiple times
  2. Pass simple data to tasks - Avoid complex objects; use IDs and fetch data inside the task
  3. Set timeouts - Prevent tasks from running too long
  4. Monitor your workers - Use tools like Flower (pip install flower) to monitor Celery
  5. Use proper serialization - JSON is recommended for most cases
  6. Handle results properly - Don't keep results forever if you don't need them

Deployment Considerations

When deploying a FastAPI + Celery application to production:

  1. Use a process manager like Supervisor to keep workers running
  2. Consider using Docker and docker-compose to simplify deployment
  3. Scale workers horizontally based on load
  4. Configure Redis with persistence if task reliability is crucial
  5. Implement proper logging for both the API and workers
  6. Set up monitoring to track task throughput and errors

Summary

In this tutorial, we've learned how to integrate Celery with FastAPI to handle background tasks efficiently. We've covered:

  • Setting up a Celery application with Redis as broker and backend
  • Defining Celery tasks with different complexities
  • Creating FastAPI endpoints that submit tasks and check their status
  • Implementing advanced features like task scheduling and error handling
  • Building a real-world document processing service example

Celery provides a powerful way to offload time-consuming operations from your web server, improving responsiveness and user experience in your FastAPI applications.

Additional Resources

Exercises

  1. Extend the document processing example to include a webhook notification when processing completes
  2. Create a dashboard endpoint that shows all currently running tasks
  3. Implement task cancellation functionality
  4. Add rate limiting to prevent users from submitting too many tasks at once
  5. Create a periodic task that cleans up completed tasks older than 24 hours


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)