Flask Celery Integration
Introduction
When building web applications with Flask, you may encounter scenarios where certain operations take too long to process within the standard request-response cycle. Examples include sending emails, processing images, generating reports, or calling external APIs. Having users wait for these operations to complete can lead to a poor user experience and potentially even timeout errors.
This is where Celery comes into the picture. Celery is a distributed task queue that allows you to run time-consuming tasks asynchronously. By integrating Celery with Flask, you can offload these heavy tasks to background workers, making your application more responsive and scalable.
In this guide, we'll explore how to integrate Celery with Flask, set up a message broker, create and execute tasks, and monitor task execution.
Prerequisites
Before we begin, make sure you have the following installed:
- Python 3.6+
- Flask
- Basic understanding of Flask applications
Understanding Celery Architecture
Celery's architecture consists of three main components:
- Celery Client: Your Flask application that creates tasks
- Message Broker: A service that queues the tasks (typically Redis or RabbitMQ)
- Celery Workers: Separate processes that execute the tasks
+----------------+ +----------------+ +----------------+
| | | | | |
| Flask App | --> | Message | --> | Celery |
| (Task Client) | | Broker | | Workers |
| | | | | |
+----------------+ +----------------+ +----------------+
Setting Up the Environment
First, let's install the necessary packages:
pip install flask celery redis
We'll use Redis as our message broker in this tutorial, but you could also use RabbitMQ.
Make sure you have Redis running on your local machine or server. If you're using Docker, you can start a Redis container with:
docker run --name redis -d -p 6379:6379 redis
Basic Flask-Celery Integration
Let's create a basic Flask application with Celery integration:
# app.py
from flask import Flask, request, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialize Celery
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
# Define a Celery task
@celery.task
def long_running_task(param):
# Simulate a time-consuming task
import time
time.sleep(10) # Sleep for 10 seconds
return f"Task completed with parameter: {param}"
@app.route('/run-task', methods=['POST'])
def run_task():
data = request.json
param = data.get('param', 'default')
# Asynchronously execute the task
task = long_running_task.delay(param)
return jsonify({
'status': 'Task started',
'task_id': task.id
}), 202 # 202 Accepted
@app.route('/task-status/<task_id>')
def task_status(task_id):
task = long_running_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'status': 'Task is pending...'
}
elif task.state == 'FAILURE':
response = {
'state': task.state,
'status': 'Task failed',
'error': str(task.info) # Error info
}
else:
response = {
'state': task.state,
'status': 'Task completed',
'result': task.result
}
return jsonify(response)
if __name__ == '__main__':
app.run(debug=True)
Running the Application
To run this application, you'll need to:
- Start the Flask development server
- Start the Celery worker
In one terminal, start the Flask application:
python app.py
In another terminal, start the Celery worker:
celery -A app.celery worker --loglevel=info
Now you can test your application:
- Send a POST request to
/run-task
- Get the
task_id
from the response - Check the task status using the
/task-status/<task_id>
endpoint
Creating a Factory Pattern for Flask-Celery Integration
For larger applications, it's better to use the Flask application factory pattern to integrate Celery:
# app.py
from flask import Flask
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
def create_app():
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379/0',
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
)
# Other Flask configurations and blueprints...
return app
app = create_app()
celery = make_celery(app)
# Now define your tasks
@celery.task
def add_together(a, b):
return a + b
Real-World Examples
Let's look at some practical use cases for Celery in a Flask application:
Example 1: Sending Email Notifications
# tasks.py
from flask_mail import Mail, Message
from celery import shared_task
from app import app
mail = Mail(app)
@shared_task
def send_email_task(subject, recipient, body):
with app.app_context():
msg = Message(
subject=subject,
recipients=[recipient],
body=body
)
mail.send(msg)
return f"Email sent to {recipient}"
# routes.py
from flask import request, jsonify
from tasks import send_email_task
@app.route('/send-email', methods=['POST'])
def send_email():
data = request.json
subject = data.get('subject')
recipient = data.get('recipient')
body = data.get('body')
# Queue the email sending task
task = send_email_task.delay(subject, recipient, body)
return jsonify({
'message': 'Email queued for delivery',
'task_id': task.id
})
Example 2: Image Processing
# tasks.py
from celery import shared_task
from PIL import Image
import os
@shared_task
def resize_image(image_path, width, height):
try:
img = Image.open(image_path)
resized_img = img.resize((width, height))
# Generate the output path
filename = os.path.basename(image_path)
name, ext = os.path.splitext(filename)
output_path = f"static/resized/{name}_resized{ext}"
# Make sure the directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Save the resized image
resized_img.save(output_path)
return output_path
except Exception as e:
return str(e)
# routes.py
from flask import request, jsonify, url_for
import os
from tasks import resize_image
@app.route('/upload-image', methods=['POST'])
def upload_image():
if 'image' not in request.files:
return jsonify({'error': 'No image part'}), 400
file = request.files['image']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
# Save the uploaded file
upload_folder = 'static/uploads'
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, file.filename)
file.save(file_path)
# Get the requested dimensions
width = int(request.form.get('width', 300))
height = int(request.form.get('height', 300))
# Queue the resizing task
task = resize_image.delay(file_path, width, height)
return jsonify({
'message': 'Image processing started',
'task_id': task.id,
'check_status_url': url_for('task_status', task_id=task.id, _external=True)
})
Advanced Celery Features
Scheduled Tasks with Celery Beat
Celery Beat allows you to schedule periodic tasks:
from celery.schedules import crontab
app.conf.beat_schedule = {
'daily-cleanup-task': {
'task': 'app.tasks.cleanup_old_data',
'schedule': crontab(hour=3, minute=0), # Run at 3:00 AM every day
},
'hourly-stats-update': {
'task': 'app.tasks.update_stats',
'schedule': 60 * 60, # Every hour
}
}
Start the Celery Beat scheduler:
celery -A app.celery beat
Task Chains and Groups
You can chain tasks or run them in parallel:
# Task chains - tasks executed sequentially
from celery import chain
result = chain(
task1.s(arg1, arg2),
task2.s(arg3),
task3.s()
)()
# Task groups - tasks executed in parallel
from celery import group
tasks = group(
task1.s(arg1),
task1.s(arg2),
task1.s(arg3)
)()
Task Progress Updates
You can update task progress:
@celery.task(bind=True)
def long_running_task_with_progress(self, items):
total = len(items)
for i, item in enumerate(items):
# Process item
process_item(item)
# Update task state
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': total, 'percent': (i / total) * 100}
)
return {'status': 'Task completed', 'result': 'Success'}
Best Practices
- Task Idempotency: Design tasks to be idempotent (can be executed multiple times without changing the result)
- Handle Failures: Always include error handling in your tasks
- Task Timeouts: Set timeouts for long-running tasks to prevent worker blockage
- Monitoring: Use monitoring tools like Flower to monitor task execution
- Result Backend: Consider using a persistent result backend for important tasks
- Task Queuing Priorities: Use different queues for different priorities
Common Issues and Solutions
Workers Not Starting
If your workers aren't starting, check that:
- Redis (or your broker) is running
- The broker URL is correct
- You're running the Celery worker from the correct directory
Tasks Getting Lost
If tasks are being lost:
- Check that the task isn't crashing due to exceptions
- Ensure your broker is properly configured
- Consider enabling acknowledgements
High Memory Usage
If you're experiencing high memory usage:
- Check for memory leaks in your tasks
- Consider enabling task retries with backoff
- Implement concurrency limits
Summary
In this guide, we've learned how to:
- Set up Celery with Flask
- Create and execute asynchronous tasks
- Check task status and results
- Use the factory pattern for larger applications
- Implement real-world examples like email sending and image processing
- Explore advanced Celery features like scheduled tasks and task chains
By integrating Celery with Flask, you can significantly improve your application's responsiveness and user experience by offloading time-consuming operations to background processes.
Additional Resources
Exercises
- Create a Flask application that uses Celery to generate and email PDF reports
- Implement a file conversion service that uses Celery to convert between different formats
- Build a web scraping application that uses Celery to crawl websites in the background
- Create a data analysis pipeline where each step is a separate Celery task
- Implement a task monitoring dashboard using Flask and Celery's inspection API
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)