Django Celery Integration
When building web applications with Django, you may encounter situations where certain tasks take too long to process within the normal request-response cycle. Examples include:
- Sending emails to multiple recipients
- Processing uploaded files
- Generating reports
- Making API calls to external services
- Running periodic maintenance tasks
These operations can significantly delay response times, leading to poor user experience. This is where Celery comes in.
Celery is a distributed task queue system that allows you to run time-consuming tasks asynchronously in the background. By integrating Celery with Django, you can offload these tasks from your web server, making your application more responsive and scalable.
What is Celery?
Celery is an asynchronous task queue/job queue based on distributed message passing. It focuses on real-time operation but supports scheduling as well. Tasks can execute concurrently on one or more worker servers using multiprocessing.
The main components of Celery are:
- Celery Client - Where tasks are defined and sent to the queue
- Message Broker - The queue system (like RabbitMQ or Redis) that stores tasks
- Celery Workers - Processes that take tasks from the queue and execute them
- Result Backend - Optional storage for task results (can also be Redis)
Before we begin, make sure you have:
- A working Django project
- Python 3.6+ installed
- Basic understanding of Django views and models
- Redis or RabbitMQ installed (we'll use Redis for this tutorial)
Setting Up Celery in Django
Let's go through the step-by-step process of integrating Celery with Django:
Step 1: Install Required Packages
First, we need to install Celery and the Redis package:
pip install celery redis
Step 2: Configure Django Settings
Let's add the Celery configuration to your Django settings file (
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
Step 3: Create the Celery Instance
Create a new file called
in your Django project directory (the same directory that contains the
# myproject/
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# Create the Celery app
app = Celery('myproject')
# Load configuration from Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')
# Auto-discover tasks in all installed apps
def debug_task(self):
print(f'Request: {self.request!r}')
Step 4: Update the Project's
Update your project's
file to ensure the Celery app is loaded when Django starts:
# myproject/
from .celery import app as celery_app
__all__ = ('celery_app',)
Step 5: Create Tasks
Create a file called
in one of your Django apps:
# myapp/
from celery import shared_task
import time
def add(x, y):
return x + y
def long_running_task(duration=10):
"""Simulate a long-running process"""
return f"Task completed after {duration} seconds!"
def send_email_notification(user_id, message):
from django.contrib.auth.models import User
from django.core.mail import send_mail
user = User.objects.get(id=user_id)
'[email protected]',
return f"Email sent to {}"
Using Celery in Django Views
Now let's see how to use Celery tasks in your Django views:
Simple Task Execution
# myapp/
from django.http import JsonResponse
from .tasks import add, long_running_task, send_email_notification
def trigger_task(request):
# Run a simple addition task
result = add.delay(5, 3)
# The is the task ID that can be used to check the status later
return JsonResponse({"task_id":})
def send_notification(request, user_id):
# Run email sending task in the background
task = send_email_notification.delay(
"This is a notification message."
return JsonResponse({
"message": "Notification task started",
Tracking Task Status
To check the status of a task, you can do:
# myapp/
from celery.result import AsyncResult
from django.http import JsonResponse
def check_task(request, task_id):
result = AsyncResult(task_id)
response = {
"task_id": task_id,
"status": result.status,
# Add the result if the task is completed
if result.ready():
response["result"] = result.get()
return JsonResponse(response)
Scheduled Tasks with Celery Beat
Celery Beat is a scheduler that helps you run tasks periodically:
Step 1: Install Celery Beat
pip install django-celery-beat
Step 2: Add to Installed Apps
# ... other apps
Step 3: Run Migrations
python migrate
Step 4: Define Periodic Tasks
You can define periodic tasks in your
from celery.schedules import crontab
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16),
'daily-cleanup-at-midnight': {
'task': 'myapp.tasks.cleanup_database',
'schedule': crontab(hour=0, minute=0), # Midnight
'weekly-report-every-monday': {
'task': 'myapp.tasks.generate_weekly_report',
'schedule': crontab(day_of_week=1, hour=7, minute=30), # Monday at 7:30am
Running Celery
To run Celery, you need to start both the worker and (optionally) the beat scheduler in separate terminal windows.
Start the Celery Worker
celery -A myproject worker -l INFO
Start the Celery Beat Scheduler (Optional, for periodic tasks)
celery -A myproject beat -l INFO
If you're using django-celery-beat for dynamic scheduling:
celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
Real-World Example: File Processing Application
Let's create a more complete example. Imagine we have a Django application that allows users to upload CSV files that need to be processed.
Create the Models
# myapp/
from django.db import models
from django.contrib.auth.models import User
class CSVFile(models.Model):
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
file = models.FileField(upload_to='csv_files/')
uploaded_by = models.ForeignKey(User, on_delete=models.CASCADE)
uploaded_at = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='pending')
result_data = models.JSONField(null=True, blank=True)
def __str__(self):
return f"{} - {self.status}"
Create the Tasks
# myapp/
import csv
import time
from celery import shared_task
from .models import CSVFile
def process_csv_file(csv_file_id):
# Get the file object
csv_file_obj = CSVFile.objects.get(id=csv_file_id)
# Update status to processing
csv_file_obj.status = 'processing'
# Simulate processing time
# Process the CSV file
data = []
with open(csv_file_obj.file.path, 'r') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
# Update the model with results
csv_file_obj.status = 'completed'
csv_file_obj.result_data = {
'total_rows': len(data),
'processed_data': data[:5] # Store first 5 rows as sample
return {
'file_id': csv_file_id,
'rows_processed': len(data),
'status': 'success'
except Exception as e:
# Update status to failed
csv_file_obj = CSVFile.objects.get(id=csv_file_id)
csv_file_obj.status = 'failed'
return {
'file_id': csv_file_id,
'status': 'error',
'error': str(e)
Create the Views
# myapp/
from django.http import JsonResponse
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth.decorators import login_required
from .models import CSVFile
from .tasks import process_csv_file
from .forms import CSVFileForm
def upload_csv(request):
if request.method == 'POST':
form = CSVFileForm(request.POST, request.FILES)
if form.is_valid():
csv_file =
csv_file.uploaded_by = request.user
# Trigger the Celery task
return redirect('file_detail',
form = CSVFileForm()
return render(request, 'myapp/upload.html', {'form': form})
def file_detail(request, file_id):
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)
return render(request, 'myapp/file_detail.html', {'file': csv_file})
def file_status(request, file_id):
"""AJAX endpoint to check file processing status"""
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)
return JsonResponse({
'status': csv_file.status,
'completed': csv_file.status in ['completed', 'failed'],
'result': csv_file.result_data
Create the Forms
# myapp/
from django import forms
from .models import CSVFile
class CSVFileForm(forms.ModelForm):
class Meta:
model = CSVFile
fields = ['file']
Create the Templates
<!-- myapp/templates/myapp/upload.html -->
<h1>Upload CSV File</h1>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<button type="submit">Upload</button>
<!-- myapp/templates/myapp/file_detail.html -->
<h1>File Details</h1>
<div id="file-info">
<p>File: {{ }}</p>
<p>Status: <span id="status">{{ file.status }}</span></p>
<p>Uploaded: {{ file.uploaded_at }}</p>
<div id="result-container" style="display: none;">
<pre id="result-data"></pre>
function checkStatus() {
fetch('/file-status/{{ }}/')
.then(response => response.json())
.then(data => {
document.getElementById('status').textContent = data.status;
if (data.completed) {
if (data.status === 'completed') {
document.getElementById('result-container').style.display = 'block';
document.getElementById('result-data').textContent = JSON.stringify(data.result, null, 2);
// Check status every 2 seconds
const statusChecker = setInterval(checkStatus, 2000);
// Initial check
Create the URLs
# myapp/
from django.urls import path
from . import views
urlpatterns = [
path('upload/', views.upload_csv, name='upload_csv'),
path('file/<int:file_id>/', views.file_detail, name='file_detail'),
path('file-status/<int:file_id>/', views.file_status, name='file_status'),
Best Practices
Idempotent Tasks: Design tasks so they can be safely retried if they fail.
Task Timeouts: Set timeouts for long-running tasks to avoid worker processes being tied up indefinitely.
Error Handling: Always implement proper error handling in your tasks.
Task Monitoring: Use tools like Flower (a Celery monitoring tool) to monitor your tasks:
bashpip install flower
celery -A myproject flower -
Task Routing: For complex applications, route different types of tasks to different queues/workers:
'myapp.tasks.send_email_notification': {'queue': 'emails'},
'myapp.tasks.process_csv_file': {'queue': 'data_processing'},
} -
Resource Management: Be mindful of memory leaks in long-running workers. Consider restarting workers periodically.
Common Issues and Solutions
Tasks Are Not Being Executed
- Ensure the Celery worker is running
- Verify the broker (Redis/RabbitMQ) is running
- Check for connection issues to the broker
- Look at the Celery logs for errors
Tasks Are Getting Lost
- Implement task retries:
@shared_task(bind=True, max_retries=3)
def my_task(self):
# Some code that might fail
except Exception as e:
self.retry(exc=e, countdown=60) # Retry after 60 seconds
High Memory Usage
- Periodically restart workers:
def my_task(self):
current_task = current_app.current_task
if and current_task.request.delivery_info:
# Get worker name
worker_name = current_task.request.delivery_info['consumer_tag']
if some_condition_to_restart:
current_app.control.broadcast('pool_restart', arguments={'reload': True}, destination=[worker_name])
Django and Celery integration provides a powerful solution for handling asynchronous tasks in your web applications. By offloading time-consuming operations to background workers, you can keep your application responsive and provide a better user experience.
In this guide, we've covered:
- Setting up Celery with Django
- Creating and executing asynchronous tasks
- Implementing scheduled tasks with Celery Beat
- Building a complete file processing application
- Best practices for Celery implementation
With these tools and knowledge, you can implement efficient background processing in your Django applications to handle everything from simple periodic tasks to complex data processing operations.
