Skip to main content

Django Celery Integration

Introduction

When building web applications with Django, you may encounter situations where certain tasks take too long to process within the normal request-response cycle. Examples include:

  • Sending emails to multiple recipients
  • Processing uploaded files
  • Generating reports
  • Making API calls to external services
  • Running periodic maintenance tasks

These operations can significantly delay response times, leading to poor user experience. This is where Celery comes in.

Celery is a distributed task queue system that allows you to run time-consuming tasks asynchronously in the background. By integrating Celery with Django, you can offload these tasks from your web server, making your application more responsive and scalable.

What is Celery?

Celery is an asynchronous task queue/job queue based on distributed message passing. It focuses on real-time operation but supports scheduling as well. Tasks can execute concurrently on one or more worker servers using multiprocessing.

The main components of Celery are:

  1. Celery Client - Where tasks are defined and sent to the queue
  2. Message Broker - The queue system (like RabbitMQ or Redis) that stores tasks
  3. Celery Workers - Processes that take tasks from the queue and execute them
  4. Result Backend - Optional storage for task results (can also be Redis)

Prerequisites

Before we begin, make sure you have:

  • A working Django project
  • Python 3.6+ installed
  • Basic understanding of Django views and models
  • Redis or RabbitMQ installed (we'll use Redis for this tutorial)

Setting Up Celery in Django

Let's go through the step-by-step process of integrating Celery with Django:

Step 1: Install Required Packages

First, we need to install Celery and the Redis package:

bash
pip install celery redis

Step 2: Configure Django Settings

Let's add the Celery configuration to your Django settings file (settings.py):

python
# settings.py

# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

Step 3: Create the Celery Instance

Create a new file called celery.py in your Django project directory (the same directory that contains the settings.py file):

python
# myproject/celery.py
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

# Create the Celery app
app = Celery('myproject')

# Load configuration from Django settings
app.config_from_object('django.conf:settings', namespace='CELERY')

# Auto-discover tasks in all installed apps
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

Step 4: Update the Project's __init__.py File

Update your project's __init__.py file to ensure the Celery app is loaded when Django starts:

python
# myproject/__init__.py
from .celery import app as celery_app

__all__ = ('celery_app',)

Step 5: Create Tasks

Create a file called tasks.py in one of your Django apps:

python
# myapp/tasks.py
from celery import shared_task
import time

@shared_task
def add(x, y):
return x + y

@shared_task
def long_running_task(duration=10):
"""Simulate a long-running process"""
time.sleep(duration)
return f"Task completed after {duration} seconds!"

@shared_task
def send_email_notification(user_id, message):
from django.contrib.auth.models import User
from django.core.mail import send_mail

user = User.objects.get(id=user_id)

send_mail(
'Notification',
message,
'[email protected]',
[user.email],
fail_silently=False,
)

return f"Email sent to {user.email}"

Using Celery in Django Views

Now let's see how to use Celery tasks in your Django views:

Simple Task Execution

python
# myapp/views.py
from django.http import JsonResponse
from .tasks import add, long_running_task, send_email_notification

def trigger_task(request):
# Run a simple addition task
result = add.delay(5, 3)

# The result.id is the task ID that can be used to check the status later
return JsonResponse({"task_id": result.id})

def send_notification(request, user_id):
# Run email sending task in the background
task = send_email_notification.delay(
user_id,
"This is a notification message."
)

return JsonResponse({
"message": "Notification task started",
"task_id": task.id
})

Tracking Task Status

To check the status of a task, you can do:

python
# myapp/views.py
from celery.result import AsyncResult
from django.http import JsonResponse

def check_task(request, task_id):
result = AsyncResult(task_id)

response = {
"task_id": task_id,
"status": result.status,
}

# Add the result if the task is completed
if result.ready():
response["result"] = result.get()

return JsonResponse(response)

Scheduled Tasks with Celery Beat

Celery Beat is a scheduler that helps you run tasks periodically:

Step 1: Install Celery Beat

bash
pip install django-celery-beat

Step 2: Add to Installed Apps

python
# settings.py
INSTALLED_APPS = [
# ... other apps
'django_celery_beat',
]

Step 3: Run Migrations

bash
python manage.py migrate

Step 4: Define Periodic Tasks

You can define periodic tasks in your settings.py:

python
# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'myapp.tasks.add',
'schedule': 30.0,
'args': (16, 16),
},
'daily-cleanup-at-midnight': {
'task': 'myapp.tasks.cleanup_database',
'schedule': crontab(hour=0, minute=0), # Midnight
},
'weekly-report-every-monday': {
'task': 'myapp.tasks.generate_weekly_report',
'schedule': crontab(day_of_week=1, hour=7, minute=30), # Monday at 7:30am
},
}

Running Celery

To run Celery, you need to start both the worker and (optionally) the beat scheduler in separate terminal windows.

Start the Celery Worker

bash
celery -A myproject worker -l INFO

Start the Celery Beat Scheduler (Optional, for periodic tasks)

bash
celery -A myproject beat -l INFO

If you're using django-celery-beat for dynamic scheduling:

bash
celery -A myproject beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

Real-World Example: File Processing Application

Let's create a more complete example. Imagine we have a Django application that allows users to upload CSV files that need to be processed.

Create the Models

python
# myapp/models.py
from django.db import models
from django.contrib.auth.models import User

class CSVFile(models.Model):
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]

file = models.FileField(upload_to='csv_files/')
uploaded_by = models.ForeignKey(User, on_delete=models.CASCADE)
uploaded_at = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='pending')
result_data = models.JSONField(null=True, blank=True)

def __str__(self):
return f"{self.file.name} - {self.status}"

Create the Tasks

python
# myapp/tasks.py
import csv
import time
from celery import shared_task
from .models import CSVFile

@shared_task
def process_csv_file(csv_file_id):
try:
# Get the file object
csv_file_obj = CSVFile.objects.get(id=csv_file_id)

# Update status to processing
csv_file_obj.status = 'processing'
csv_file_obj.save()

# Simulate processing time
time.sleep(5)

# Process the CSV file
data = []
with open(csv_file_obj.file.path, 'r') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
data.append(row)

# Update the model with results
csv_file_obj.status = 'completed'
csv_file_obj.result_data = {
'total_rows': len(data),
'processed_data': data[:5] # Store first 5 rows as sample
}
csv_file_obj.save()

return {
'file_id': csv_file_id,
'rows_processed': len(data),
'status': 'success'
}
except Exception as e:
# Update status to failed
csv_file_obj = CSVFile.objects.get(id=csv_file_id)
csv_file_obj.status = 'failed'
csv_file_obj.save()

return {
'file_id': csv_file_id,
'status': 'error',
'error': str(e)
}

Create the Views

python
# myapp/views.py
from django.http import JsonResponse
from django.shortcuts import render, redirect, get_object_or_404
from django.contrib.auth.decorators import login_required

from .models import CSVFile
from .tasks import process_csv_file
from .forms import CSVFileForm

@login_required
def upload_csv(request):
if request.method == 'POST':
form = CSVFileForm(request.POST, request.FILES)
if form.is_valid():
csv_file = form.save(commit=False)
csv_file.uploaded_by = request.user
csv_file.save()

# Trigger the Celery task
process_csv_file.delay(csv_file.id)

return redirect('file_detail', file_id=csv_file.id)
else:
form = CSVFileForm()

return render(request, 'myapp/upload.html', {'form': form})

@login_required
def file_detail(request, file_id):
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)
return render(request, 'myapp/file_detail.html', {'file': csv_file})

@login_required
def file_status(request, file_id):
"""AJAX endpoint to check file processing status"""
csv_file = get_object_or_404(CSVFile, id=file_id, uploaded_by=request.user)

return JsonResponse({
'id': csv_file.id,
'status': csv_file.status,
'completed': csv_file.status in ['completed', 'failed'],
'result': csv_file.result_data
})

Create the Forms

python
# myapp/forms.py
from django import forms
from .models import CSVFile

class CSVFileForm(forms.ModelForm):
class Meta:
model = CSVFile
fields = ['file']

Create the Templates

html
<!-- myapp/templates/myapp/upload.html -->
<h1>Upload CSV File</h1>
<form method="post" enctype="multipart/form-data">
{% csrf_token %}
{{ form.as_p }}
<button type="submit">Upload</button>
</form>
html
<!-- myapp/templates/myapp/file_detail.html -->
<h1>File Details</h1>
<div id="file-info">
<p>File: {{ file.file.name }}</p>
<p>Status: <span id="status">{{ file.status }}</span></p>
<p>Uploaded: {{ file.uploaded_at }}</p>
</div>

<div id="result-container" style="display: none;">
<h2>Results</h2>
<pre id="result-data"></pre>
</div>

<script>
function checkStatus() {
fetch('/file-status/{{ file.id }}/')
.then(response => response.json())
.then(data => {
document.getElementById('status').textContent = data.status;

if (data.completed) {
clearInterval(statusChecker);
if (data.status === 'completed') {
document.getElementById('result-container').style.display = 'block';
document.getElementById('result-data').textContent = JSON.stringify(data.result, null, 2);
}
}
});
}

// Check status every 2 seconds
const statusChecker = setInterval(checkStatus, 2000);

// Initial check
checkStatus();
</script>

Create the URLs

python
# myapp/urls.py
from django.urls import path
from . import views

urlpatterns = [
path('upload/', views.upload_csv, name='upload_csv'),
path('file/<int:file_id>/', views.file_detail, name='file_detail'),
path('file-status/<int:file_id>/', views.file_status, name='file_status'),
]

Best Practices

  1. Idempotent Tasks: Design tasks so they can be safely retried if they fail.

  2. Task Timeouts: Set timeouts for long-running tasks to avoid worker processes being tied up indefinitely.

  3. Error Handling: Always implement proper error handling in your tasks.

  4. Task Monitoring: Use tools like Flower (a Celery monitoring tool) to monitor your tasks:

    bash
    pip install flower
    celery -A myproject flower
  5. Task Routing: For complex applications, route different types of tasks to different queues/workers:

    python
    # settings.py
    CELERY_TASK_ROUTES = {
    'myapp.tasks.send_email_notification': {'queue': 'emails'},
    'myapp.tasks.process_csv_file': {'queue': 'data_processing'},
    }
  6. Resource Management: Be mindful of memory leaks in long-running workers. Consider restarting workers periodically.

Common Issues and Solutions

Tasks Are Not Being Executed

  • Ensure the Celery worker is running
  • Verify the broker (Redis/RabbitMQ) is running
  • Check for connection issues to the broker
  • Look at the Celery logs for errors

Tasks Are Getting Lost

  • Implement task retries:
    python
    @shared_task(bind=True, max_retries=3)
    def my_task(self):
    try:
    # Some code that might fail
    pass
    except Exception as e:
    self.retry(exc=e, countdown=60) # Retry after 60 seconds

High Memory Usage

  • Periodically restart workers:
    python
    @shared_task(bind=True)
    def my_task(self):
    current_task = current_app.current_task
    if current_task.request.id and current_task.request.delivery_info:
    # Get worker name
    worker_name = current_task.request.delivery_info['consumer_tag']
    if some_condition_to_restart:
    current_app.control.broadcast('pool_restart', arguments={'reload': True}, destination=[worker_name])

Summary

Django and Celery integration provides a powerful solution for handling asynchronous tasks in your web applications. By offloading time-consuming operations to background workers, you can keep your application responsive and provide a better user experience.

In this guide, we've covered:

  • Setting up Celery with Django
  • Creating and executing asynchronous tasks
  • Implementing scheduled tasks with Celery Beat
  • Building a complete file processing application
  • Best practices for Celery implementation

With these tools and knowledge, you can implement efficient background processing in your Django applications to handle everything from simple periodic tasks to complex data processing operations.

Additional Resources

  1. Celery Documentation
  2. Django Celery Integration Guide
  3. Django Celery Beat
  4. Flower: Celery Monitoring Tool

Exercises

  1. Email Newsletter System: Create a Django application that allows administrators to send email newsletters to users. Use Celery to handle the email sending in the background.

  2. Report Generation: Build a system that generates PDF reports from database data. The report generation should happen asynchronously with Celery.

  3. Scheduled Data Backup: Implement a Celery Beat task that backs up your database daily during off-peak hours.

  4. API Integration: Create a system that fetches data from an external API and processes it in the background using Celery tasks.

  5. Task Progress Bar: Extend the file processing example to show a progress bar during CSV processing using WebSockets and Celery's task progress tracking.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)