FastAPI Celery Integration
Introduction
When building web applications with FastAPI, you may encounter scenarios where certain operations take too long to complete during an HTTP request lifecycle. These could be tasks like sending emails, processing large files, or performing complex computations. While FastAPI includes a built-in background tasks system, it's limited for more complex scenarios.
Celery is a distributed task queue that can handle these long-running processes outside of your web application's request-response cycle. In this tutorial, we'll learn how to integrate Celery with FastAPI to create a robust system for handling background tasks.
What is Celery?
Celery is an asynchronous task queue/job queue based on distributed message passing. It focuses on real-time operations but also supports scheduling. Celery is used to run tasks in the background, outside the HTTP request-response cycle.
Key components of a Celery system include:
- Tasks: Python functions registered with Celery
- Message Broker: Queue system that distributes tasks (like Redis or RabbitMQ)
- Workers: Processes that execute the tasks
- Result Backend: Optional storage for task results
Setting Up the Environment
Let's start by setting up our development environment.
Installation
First, we need to install the necessary packages:
pip install fastapi uvicorn celery redis
We're using Redis as our message broker and result backend, but you could also use RabbitMQ or other supported brokers.
Project Structure
We'll set up our project with the following structure:
project/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ └── tasks.py
├── requirements.txt
└── README.md
Basic Celery Configuration
Let's start by creating our Celery application in the celery_app.py
file:
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
# Optional configuration
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
)
This creates a Celery application instance and configures it to use Redis as both the broker and the result backend.
Defining Celery Tasks
Now, let's define some tasks in the tasks.py
file:
from app.celery_app import celery_app
import time
@celery_app.task
def add(x, y):
"""Add two numbers (slow operation for demo)"""
time.sleep(5) # Simulate a time-consuming task
return x + y
@celery_app.task(bind=True)
def process_file(self, file_path):
"""Process a file with progress tracking"""
total_steps = 10
for i in range(total_steps):
# Update task state to track progress
self.update_state(state='PROGRESS', meta={'current': i, 'total': total_steps})
time.sleep(1) # Simulate processing
return {"status": "completed", "file": file_path}
Integrating with FastAPI
Now, let's create our FastAPI application and integrate it with Celery in main.py
:
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from app.celery_app import celery_app
from app.tasks import add, process_file
app = FastAPI(title="FastAPI Celery Integration")
@app.get("/")
async def root():
return {"message": "FastAPI Celery Integration Example"}
@app.post("/tasks/add")
async def add_task(x: int, y: int):
"""Submit an addition task to Celery"""
task = add.delay(x, y)
return {"task_id": task.id, "message": "Task submitted to queue"}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get the status of a task"""
task_result = AsyncResult(task_id, app=celery_app)
result = {
"task_id": task_id,
"status": task_result.status,
}
if task_result.status == "SUCCESS":
result["result"] = task_result.get()
elif task_result.status == "PROGRESS":
result["progress"] = task_result.info
return result
@app.post("/tasks/process-file")
async def start_file_processing(file_path: str):
"""Start file processing task"""
task = process_file.delay(file_path)
return {"task_id": task.id, "message": "File processing started"}
Running the Application
To run this application, you'll need to start three components:
- The FastAPI server:
uvicorn app.main:app --reload
- The Redis server (if not already running):
redis-server
- The Celery worker:
celery -A app.celery_app worker --loglevel=info
Example Workflow
Let's see how a full workflow would look like:
-
Submit a task to add numbers:
- Request:
POST /tasks/add?x=5&y=3
- Response:
json
{
"task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
"message": "Task submitted to queue"
}
- Request:
-
Check task status:
- Request:
GET /tasks/7a5cb1ae-50e7-4f9c-8799-3a6d32488edb
- Response (while processing):
json
{
"task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
"status": "PENDING"
} - Response (after completion):
json
{
"task_id": "7a5cb1ae-50e7-4f9c-8799-3a6d32488edb",
"status": "SUCCESS",
"result": 8
}
- Request:
-
Submit a file processing task:
- Request:
POST /tasks/process-file?file_path=/path/to/file.txt
- Response:
json
{
"task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
"message": "File processing started"
}
- Request:
-
Check file processing status:
- Request:
GET /tasks/e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a
- Response (while processing):
json
{
"task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
"status": "PROGRESS",
"progress": {
"current": 4,
"total": 10
}
} - Response (after completion):
json
{
"task_id": "e48d9c67-f4c5-4f3a-b8da-54d9b2c89f1a",
"status": "SUCCESS",
"result": {
"status": "completed",
"file": "/path/to/file.txt"
}
}
- Request:
Advanced Features
Task Scheduling
Celery supports task scheduling. Let's add a scheduled task to our application:
# In celery_app.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'cleanup-every-hour': {
'task': 'app.tasks.cleanup_temp_files',
'schedule': crontab(minute=0, hour='*'), # Run at the start of every hour
'args': (),
},
}
# In tasks.py
@celery_app.task
def cleanup_temp_files():
"""Clean up temporary files"""
# Implementation details
print("Cleaning up temporary files")
return {"status": "cleaned", "count": 5} # Example result
To run the scheduler, you'll need to start Celery Beat:
celery -A app.celery_app beat --loglevel=info
Error Handling
It's important to handle errors in our Celery tasks:
@celery_app.task(bind=True, max_retries=3)
def send_email(self, to_email, subject, body):
"""Send an email with retry logic"""
try:
# Code to send email
print(f"Sending email to {to_email}")
# Simulate error sometimes
if "error" in to_email:
raise ValueError("Failed to send email")
return {"status": "sent", "to": to_email}
except Exception as exc:
# Retry the task with exponential backoff
retry_in = 5 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=retry_in)
Extending the FastAPI Integration
Let's create a more sophisticated endpoint for email sending:
from pydantic import BaseModel, EmailStr
class EmailRequest(BaseModel):
to_email: EmailStr
subject: str
body: str
@app.post("/tasks/send-email")
async def send_email_task(email_req: EmailRequest):
"""Queue an email to be sent"""
task = send_email.delay(
email_req.to_email,
email_req.subject,
email_req.body
)
return {"task_id": task.id, "message": "Email queued for delivery"}
Real-world Example: Document Processing Service
Let's put everything together in a real-world example of a document processing service:
# app/tasks.py
import time
import os
from celery import states
from app.celery_app import celery_app
@celery_app.task(bind=True)
def process_document(self, document_id, options):
"""Process a document with various transformations"""
# Simulate a complex document processing pipeline
steps = [
"validating",
"parsing",
"transforming",
"optimizing",
"saving"
]
total_steps = len(steps)
for i, step in enumerate(steps):
# Update progress
self.update_state(
state=states.STARTED,
meta={
'current_step': i + 1,
'total_steps': total_steps,
'step_name': step,
'progress_percent': int((i + 0.5) * 100 / total_steps)
}
)
# Simulate processing time for this step
time.sleep(2)
# If this was a real processor, we might have actual work here
if step == "parsing" and options.get("strict", False):
# Simulate finding an error with strict parsing
if document_id.endswith("_bad"):
self.update_state(
state=states.FAILURE,
meta={'error': 'Document failed strict validation'}
)
return {'status': 'error', 'reason': 'Document failed strict validation'}
# Success!
result = {
'status': 'completed',
'document_id': document_id,
'pages': 5, # Example metadata about the processed document
'output_path': f"/processed/{document_id}.pdf"
}
return result
# app/main.py (additional endpoints)
from pydantic import BaseModel
from typing import Dict, Any, Optional
class DocumentProcessRequest(BaseModel):
document_id: str
options: Dict[str, Any] = {}
priority: Optional[str] = "normal" # Could be "low", "normal", "high"
@app.post("/documents/process")
async def process_document_endpoint(request: DocumentProcessRequest):
"""Submit a document for processing"""
# You could adjust task priority based on the request
queue = "high_priority" if request.priority == "high" else "default"
task = process_document.apply_async(
args=[request.document_id, request.options],
queue=queue
)
return {
"task_id": task.id,
"document_id": request.document_id,
"status": "submitted"
}
Best Practices
When integrating Celery with FastAPI, keep these best practices in mind:
- Keep tasks idempotent - A task should produce the same result even if run multiple times
- Pass simple data to tasks - Avoid complex objects; use IDs and fetch data inside the task
- Set timeouts - Prevent tasks from running too long
- Monitor your workers - Use tools like Flower (
pip install flower
) to monitor Celery - Use proper serialization - JSON is recommended for most cases
- Handle results properly - Don't keep results forever if you don't need them
Deployment Considerations
When deploying a FastAPI + Celery application to production:
- Use a process manager like Supervisor to keep workers running
- Consider using Docker and docker-compose to simplify deployment
- Scale workers horizontally based on load
- Configure Redis with persistence if task reliability is crucial
- Implement proper logging for both the API and workers
- Set up monitoring to track task throughput and errors
Summary
In this tutorial, we've learned how to integrate Celery with FastAPI to handle background tasks efficiently. We've covered:
- Setting up a Celery application with Redis as broker and backend
- Defining Celery tasks with different complexities
- Creating FastAPI endpoints that submit tasks and check their status
- Implementing advanced features like task scheduling and error handling
- Building a real-world document processing service example
Celery provides a powerful way to offload time-consuming operations from your web server, improving responsiveness and user experience in your FastAPI applications.
Additional Resources
Exercises
- Extend the document processing example to include a webhook notification when processing completes
- Create a dashboard endpoint that shows all currently running tasks
- Implement task cancellation functionality
- Add rate limiting to prevent users from submitting too many tasks at once
- Create a periodic task that cleans up completed tasks older than 24 hours
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)