Django Celery Integration
Introduction
When building web applications with Django, you may encounter situations where certain tasks take too long to process within the normal request-response cycle. Examples include:
- Sending emails to multiple recipients
- Processing uploaded files
- Generating reports
- Making API calls to external services
- Running periodic maintenance tasks
These operations can significantly delay response times, leading to poor user experience. This is where Celery comes in.
Celery is a distributed task queue system that allows you to run time-consuming tasks asynchronously in the background. By integrating Celery with Django, you can offload these tasks from your web server, making your application more responsive and scalable.
What is Celery?
Celery is an asynchronous task queue/job queue based on distributed message passing. It focuses on real-time operation but supports scheduling as well. Tasks can execute concurrently on one or more worker servers using multiprocessing.
The main components of Celery are:
- Celery Client - Where tasks are defined and sent to the queue
- Message Broker - The queue system (like RabbitMQ or Redis) that stores tasks
- Celery Workers - Processes that take tasks from the queue and execute them
- Result Backend - Optional storage for task results (can also be Redis)
Prerequisites
Before we begin, make sure you have:
- A working Django project
- Python 3.6+ installed
- Basic understanding of Django views and models
- Redis or RabbitMQ installed (we'll use Redis for this tutorial)
Setting Up Celery in Django
Let's go through the step-by-step process of integrating Celery with Django:
Step 1: Install Required Packages
First, we need to install Celery and the Redis package:
pip install celery redis
Step 2: Configure Django Settings
Let's add the Celery configuration to your Django settings file (settings.py
):
# settings.py
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
Step 3: Create the Celery Instance
Create a new file called celery.py
in your Django project directory (the same directory that contains the settings.py
file):
# myproject/celery.py
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Create the Celery app
app = Celery('myproject')
# Load configuration from Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')
# Auto-discover tasks in all installed apps
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
Step 4: Update the Project's __init__.py
File
Update your project's __init__.py
file to ensure the Celery app is loaded when Django starts:
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Step 5: Create Tasks
Create a file called tasks.py
in one of your Django apps:
# myapp/tasks.py
from celery import shared_task
import time
@shared_task
def add(x, y):
return x + y
@shared_task
def long_running_task(duration=10):
"""Simulate a long-running process"""
time.sleep(duration)
return f"Task completed after {duration} seconds!"
@shared_task
def send_email_notification(user_id, message):
from django.contrib.auth.models import User
from django.core.mail import send_mail
user = User.objects.get(id=user_id)
send_mail(
'Notification',
message,
'[email protected]',
[user.email],
fail_silently=False,
)
return f"Email sent to {user.email}"
Using Celery in Django Views
Now let's see how to use Celery tasks in your Django views:
Simple Task Execution
# myapp/views.py
from django.http import JsonResponse
from .tasks import add, long_running_task, send_email_notification
def trigger_task(request):
# Run a simple addition task
result = add.delay(5, 3)
# The result.id is the task ID that can be used to check the status later
return JsonResponse({"task_id": result.id})
def send_notification(request, user_id):
# Run email sending task in the background
task = send_email_notification.delay(
user_id,
"This is a notification message."
)
return JsonResponse({
"message": "Notification task started",
"task_id": task.id
})
Tracking Task Status
To check the status of a task, you can do:
# myapp/views.py
from celery.result import AsyncResult
from django.http import JsonResponse
def check_task(request, task_id):
result = AsyncResult(task_id)
response = {
"task_id": task_id,
"status": result.status,
}
# Add the result if the task is completed
if result.ready():
response["result"] = result.get()
return JsonResponse(response)
Scheduled Tasks with Celery Beat
Celery Beat is a scheduler that helps you run tasks periodically:
Step 1: Install Celery Beat
pip install django-celery-beat
Step 2: Add to Installed Apps
# settings.py
INSTALLED_APPS = [
# ... other apps
'django_celery_beat',
]
Step 3: Run Migrations
python manage.py migrate
Step 4: Define Periodic Tasks
You can define periodic tasks in your settings.py
:
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16),
},
'daily-cleanup-at-midnight': {
'task': 'myapp.tasks.cleanup_database',
'schedule': crontab(hour=0, minute=0), # Midnight
},
'weekly-report-every-monday': {
'task': 'myapp.tasks.generate_weekly_report',
'schedule': crontab(day_of_week=1, hour=7, minute=30), # Monday at 7:30am
},
}
Running Celery
To run Celery, you need to start both the worker and (optionally) the beat scheduler in separate terminal windows.
Start the Celery Worker
celery -A myproject worker -l INFO
Start the Celery Beat Scheduler (Optional, for periodic tasks)
celery -A myproject beat -l INFO
If you're using django-celery-beat for dynamic scheduling:
celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
Real-World Example: File Processing Application
Let's create a more complete example. Imagine we have a Django application that allows users to upload CSV files that need to be processed.
Create the Models
# myapp/models.py
from django.db import models
from django.contrib.auth.models import User
class CSVFile(models.Model):
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]
file = models.FileField(upload_to='csv_files/')
uploaded_by = models.ForeignKey(User, on_delete=models.CASCADE)
uploaded_at = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='pending')
result_data = models.JSONField(null=True, blank=True)
def __str__(self):
return f"{self.file.name} - {self.status}"
Create the Tasks
# myapp/tasks.py
import csv
import time
from celery import shared_task
from .models import CSVFile
@shared_task
def process_csv_file(csv_file_id):
try:
# Get the file object
csv_file_obj = CSVFile.objects.get(id=csv_file_id)
# Update status to processing
csv_file_obj.status = 'processing'
csv_file_obj.save()
# Simulate processing time
time.sleep(5)
# Process the CSV file
data = []
with open(csv_file_obj.file.path, 'r') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
data.append(row)
# Update the model with results
csv_file_obj.status = 'completed'
csv_file_obj.result_data = {
'total_rows': len(data),
'processed_data': data[:5] # Store first 5 rows as sample
}
csv_file_obj.save()
return {
'file_id': csv_file_id,
'rows_processed': len(data),
'status': 'success'
}
except Exception as e:
# Update status to failed
csv_file_obj = CSVFile.objects.get(id=csv_file_id)
csv_file_obj.status = 'failed'
csv_file_obj.save()
return {
'file_id': csv_file_id,
'status': 'error',
'error': str(e)
}
Create the Views
# myapp/views.py
from django.http import JsonResponse
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth.decorators import login_required
from .models import CSVFile
from .tasks import process_csv_file
from .forms import CSVFileForm
@login_required
def upload_csv(request):
if request.method == 'POST':
form = CSVFileForm(request.POST, request.FILES)
if form.is_valid():
csv_file = form.save(commit=False)
csv_file.uploaded_by = request.user
csv_file.save()
# Trigger the Celery task
process_csv_file.delay(csv_file.id)
return redirect('file_detail', file_id=csv_file.id)
else:
form = CSVFileForm()
return render(request, 'myapp/upload.html', {'form': form})
@login_required
def file_detail(request, file_id):
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)
return render(request, 'myapp/file_detail.html', {'file': csv_file})
@login_required
def file_status(request, file_id):
"""AJAX endpoint to check file processing status"""
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)
return JsonResponse({
'id': csv_file.id,
'status': csv_file.status,
'completed': csv_file.status in ['completed', 'failed'],
'result': csv_file.result_data
})
Create the Forms
# myapp/forms.py
from django import forms
from .models import CSVFile
class CSVFileForm(forms.ModelForm):
class Meta:
model = CSVFile
fields = ['file']
Create the Templates
<!-- myapp/templates/myapp/upload.html -->
<h1>Upload CSV File</h1>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<button type="submit">Upload</button>
</form>
<!-- myapp/templates/myapp/file_detail.html -->
<h1>File Details</h1>
<div id="file-info">
<p>File: {{ file.file.name }}</p>
<p>Status: <span id="status">{{ file.status }}</span></p>
<p>Uploaded: {{ file.uploaded_at }}</p>
</div>
<div id="result-container" style="display: none;">
<h2>Results</h2>
<pre id="result-data"></pre>
</div>
<script>
function checkStatus() {
fetch('/file-status/{{ file.id }}/')
.then(response => response.json())
.then(data => {
document.getElementById('status').textContent = data.status;
if (data.completed) {
clearInterval(statusChecker);
if (data.status === 'completed') {
document.getElementById('result-container').style.display = 'block';
document.getElementById('result-data').textContent = JSON.stringify(data.result, null, 2);
}
}
});
}
// Check status every 2 seconds
const statusChecker = setInterval(checkStatus, 2000);
// Initial check
checkStatus();
</script>
Create the URLs
# myapp/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('upload/', views.upload_csv, name='upload_csv'),
path('file/<int:file_id>/', views.file_detail, name='file_detail'),
path('file-status/<int:file_id>/', views.file_status, name='file_status'),
]
Best Practices
-
Idempotent Tasks: Design tasks so they can be safely retried if they fail.
-
Task Timeouts: Set timeouts for long-running tasks to avoid worker processes being tied up indefinitely.
-
Error Handling: Always implement proper error handling in your tasks.
-
Task Monitoring: Use tools like Flower (a Celery monitoring tool) to monitor your tasks:
bashpip install flower
celery -A myproject flower -
Task Routing: For complex applications, route different types of tasks to different queues/workers:
python# settings.py
CELERY_TASK_ROUTES = {
'myapp.tasks.send_email_notification': {'queue': 'emails'},
'myapp.tasks.process_csv_file': {'queue': 'data_processing'},
} -
Resource Management: Be mindful of memory leaks in long-running workers. Consider restarting workers periodically.
Common Issues and Solutions
Tasks Are Not Being Executed
- Ensure the Celery worker is running
- Verify the broker (Redis/RabbitMQ) is running
- Check for connection issues to the broker
- Look at the Celery logs for errors
Tasks Are Getting Lost
- Implement task retries:
python
@shared_task(bind=True, max_retries=3)
def my_task(self):
try:
# Some code that might fail
pass
except Exception as e:
self.retry(exc=e, countdown=60) # Retry after 60 seconds
High Memory Usage
- Periodically restart workers:
python
@shared_task(bind=True)
def my_task(self):
current_task = current_app.current_task
if current_task.request.id and current_task.request.delivery_info:
# Get worker name
worker_name = current_task.request.delivery_info['consumer_tag']
if some_condition_to_restart:
current_app.control.broadcast('pool_restart', arguments={'reload': True}, destination=[worker_name])
Summary
Django and Celery integration provides a powerful solution for handling asynchronous tasks in your web applications. By offloading time-consuming operations to background workers, you can keep your application responsive and provide a better user experience.
In this guide, we've covered:
- Setting up Celery with Django
- Creating and executing asynchronous tasks
- Implementing scheduled tasks with Celery Beat
- Building a complete file processing application
- Best practices for Celery implementation
With these tools and knowledge, you can implement efficient background processing in your Django applications to handle everything from simple periodic tasks to complex data processing operations.
Additional Resources
- Celery Documentation
- Django Celery Integration Guide
- Django Celery Beat
- Flower: Celery Monitoring Tool
Exercises
-
Email Newsletter System: Create a Django application that allows administrators to send email newsletters to users. Use Celery to handle the email sending in the background.
-
Report Generation: Build a system that generates PDF reports from database data. The report generation should happen asynchronously with Celery.
-
Scheduled Data Backup: Implement a Celery Beat task that backs up your database daily during off-peak hours.
-
API Integration: Create a system that fetches data from an external API and processes it in the background using Celery tasks.
-
Task Progress Bar: Extend the file processing example to show a progress bar during CSV processing using WebSockets and Celery's task progress tracking.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)