Skip to main content

Flask Celery Integration

Introduction

When building web applications with Flask, you may encounter scenarios where certain operations take too long to process within the standard request-response cycle. Examples include sending emails, processing images, generating reports, or calling external APIs. Having users wait for these operations to complete can lead to a poor user experience and potentially even timeout errors.

This is where Celery comes into the picture. Celery is a distributed task queue that allows you to run time-consuming tasks asynchronously. By integrating Celery with Flask, you can offload these heavy tasks to background workers, making your application more responsive and scalable.

In this guide, we'll explore how to integrate Celery with Flask, set up a message broker, create and execute tasks, and monitor task execution.

Prerequisites

Before we begin, make sure you have the following installed:

  • Python 3.6+
  • Flask
  • Basic understanding of Flask applications

Understanding Celery Architecture

Celery's architecture consists of three main components:

  1. Celery Client: Your Flask application that creates tasks
  2. Message Broker: A service that queues the tasks (typically Redis or RabbitMQ)
  3. Celery Workers: Separate processes that execute the tasks
+----------------+     +----------------+     +----------------+
| | | | | |
| Flask App | --> | Message | --> | Celery |
| (Task Client) | | Broker | | Workers |
| | | | | |
+----------------+ +----------------+ +----------------+

Setting Up the Environment

First, let's install the necessary packages:

bash
pip install flask celery redis

We'll use Redis as our message broker in this tutorial, but you could also use RabbitMQ.

Make sure you have Redis running on your local machine or server. If you're using Docker, you can start a Redis container with:

bash
docker run --name redis -d -p 6379:6379 redis

Basic Flask-Celery Integration

Let's create a basic Flask application with Celery integration:

python
# app.py
from flask import Flask, request, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# Initialize Celery
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)

# Define a Celery task
@celery.task
def long_running_task(param):
# Simulate a time-consuming task
import time
time.sleep(10) # Sleep for 10 seconds
return f"Task completed with parameter: {param}"

@app.route('/run-task', methods=['POST'])
def run_task():
data = request.json
param = data.get('param', 'default')

# Asynchronously execute the task
task = long_running_task.delay(param)

return jsonify({
'status': 'Task started',
'task_id': task.id
}), 202 # 202 Accepted

@app.route('/task-status/<task_id>')
def task_status(task_id):
task = long_running_task.AsyncResult(task_id)

if task.state == 'PENDING':
response = {
'state': task.state,
'status': 'Task is pending...'
}
elif task.state == 'FAILURE':
response = {
'state': task.state,
'status': 'Task failed',
'error': str(task.info) # Error info
}
else:
response = {
'state': task.state,
'status': 'Task completed',
'result': task.result
}

return jsonify(response)

if __name__ == '__main__':
app.run(debug=True)

Running the Application

To run this application, you'll need to:

  1. Start the Flask development server
  2. Start the Celery worker

In one terminal, start the Flask application:

bash
python app.py

In another terminal, start the Celery worker:

bash
celery -A app.celery worker --loglevel=info

Now you can test your application:

  1. Send a POST request to /run-task
  2. Get the task_id from the response
  3. Check the task status using the /task-status/<task_id> endpoint

Creating a Factory Pattern for Flask-Celery Integration

For larger applications, it's better to use the Flask application factory pattern to integrate Celery:

python
# app.py
from flask import Flask
from celery import Celery

def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)

class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery.Task = ContextTask
return celery

def create_app():
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)

# Other Flask configurations and blueprints...

return app

app = create_app()
celery = make_celery(app)

# Now define your tasks
@celery.task
def add_together(a, b):
return a + b

Real-World Examples

Let's look at some practical use cases for Celery in a Flask application:

Example 1: Sending Email Notifications

python
# tasks.py
from flask_mail import Mail, Message
from celery import shared_task
from app import app

mail = Mail(app)

@shared_task
def send_email_task(subject, recipient, body):
with app.app_context():
msg = Message(
subject=subject,
recipients=[recipient],
body=body
)
mail.send(msg)
return f"Email sent to {recipient}"

# routes.py
from flask import request, jsonify
from tasks import send_email_task

@app.route('/send-email', methods=['POST'])
def send_email():
data = request.json
subject = data.get('subject')
recipient = data.get('recipient')
body = data.get('body')

# Queue the email sending task
task = send_email_task.delay(subject, recipient, body)

return jsonify({
'message': 'Email queued for delivery',
'task_id': task.id
})

Example 2: Image Processing

python
# tasks.py
from celery import shared_task
from PIL import Image
import os

@shared_task
def resize_image(image_path, width, height):
try:
img = Image.open(image_path)
resized_img = img.resize((width, height))

# Generate the output path
filename = os.path.basename(image_path)
name, ext = os.path.splitext(filename)
output_path = f"static/resized/{name}_resized{ext}"

# Make sure the directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Save the resized image
resized_img.save(output_path)

return output_path
except Exception as e:
return str(e)

# routes.py
from flask import request, jsonify, url_for
import os
from tasks import resize_image

@app.route('/upload-image', methods=['POST'])
def upload_image():
if 'image' not in request.files:
return jsonify({'error': 'No image part'}), 400

file = request.files['image']

if file.filename == '':
return jsonify({'error': 'No selected file'}), 400

# Save the uploaded file
upload_folder = 'static/uploads'
os.makedirs(upload_folder, exist_ok=True)

file_path = os.path.join(upload_folder, file.filename)
file.save(file_path)

# Get the requested dimensions
width = int(request.form.get('width', 300))
height = int(request.form.get('height', 300))

# Queue the resizing task
task = resize_image.delay(file_path, width, height)

return jsonify({
'message': 'Image processing started',
'task_id': task.id,
'check_status_url': url_for('task_status', task_id=task.id, _external=True)
})

Advanced Celery Features

Scheduled Tasks with Celery Beat

Celery Beat allows you to schedule periodic tasks:

python
from celery.schedules import crontab

app.conf.beat_schedule = {
'daily-cleanup-task': {
'task': 'app.tasks.cleanup_old_data',
'schedule': crontab(hour=3, minute=0), # Run at 3:00 AM every day
},
'hourly-stats-update': {
'task': 'app.tasks.update_stats',
'schedule': 60 * 60, # Every hour
}
}

Start the Celery Beat scheduler:

bash
celery -A app.celery beat

Task Chains and Groups

You can chain tasks or run them in parallel:

python
# Task chains - tasks executed sequentially
from celery import chain

result = chain(
task1.s(arg1, arg2),
task2.s(arg3),
task3.s()
)()

# Task groups - tasks executed in parallel
from celery import group

tasks = group(
task1.s(arg1),
task1.s(arg2),
task1.s(arg3)
)()

Task Progress Updates

You can update task progress:

python
@celery.task(bind=True)
def long_running_task_with_progress(self, items):
total = len(items)
for i, item in enumerate(items):
# Process item
process_item(item)

# Update task state
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total, 'percent': (i / total) * 100}
)

return {'status': 'Task completed', 'result': 'Success'}

Best Practices

  1. Task Idempotency: Design tasks to be idempotent (can be executed multiple times without changing the result)
  2. Handle Failures: Always include error handling in your tasks
  3. Task Timeouts: Set timeouts for long-running tasks to prevent worker blockage
  4. Monitoring: Use monitoring tools like Flower to monitor task execution
  5. Result Backend: Consider using a persistent result backend for important tasks
  6. Task Queuing Priorities: Use different queues for different priorities

Common Issues and Solutions

Workers Not Starting

If your workers aren't starting, check that:

  • Redis (or your broker) is running
  • The broker URL is correct
  • You're running the Celery worker from the correct directory

Tasks Getting Lost

If tasks are being lost:

  • Check that the task isn't crashing due to exceptions
  • Ensure your broker is properly configured
  • Consider enabling acknowledgements

High Memory Usage

If you're experiencing high memory usage:

  • Check for memory leaks in your tasks
  • Consider enabling task retries with backoff
  • Implement concurrency limits

Summary

In this guide, we've learned how to:

  • Set up Celery with Flask
  • Create and execute asynchronous tasks
  • Check task status and results
  • Use the factory pattern for larger applications
  • Implement real-world examples like email sending and image processing
  • Explore advanced Celery features like scheduled tasks and task chains

By integrating Celery with Flask, you can significantly improve your application's responsiveness and user experience by offloading time-consuming operations to background processes.

Additional Resources

Exercises

  1. Create a Flask application that uses Celery to generate and email PDF reports
  2. Implement a file conversion service that uses Celery to convert between different formats
  3. Build a web scraping application that uses Celery to crawl websites in the background
  4. Create a data analysis pipeline where each step is a separate Celery task
  5. Implement a task monitoring dashboard using Flask and Celery's inspection API


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)