Flask Task Queues
In web development, particularly with Flask applications, users expect responsive interfaces that don't freeze while processing requests. Task queues solve this problem by allowing you to execute time-consuming operations asynchronously, outside of the main request-response cycle.
What Are Task Queues?
Task queues are mechanisms that allow you to defer the execution of work outside the HTTP request-response cycle. They're essential tools for modern web applications when you need to:
- Process long-running tasks without blocking user interactions
- Schedule tasks to run in the background
- Distribute workloads across multiple worker processes
- Handle resource-intensive operations without impacting user experience
Why Use Task Queues in Flask?
Flask, being a synchronous web framework, executes requests one at a time. When a request involves time-consuming operations such as:
- Sending emails
- Processing images or videos
- Generating reports
- Making external API calls
- Data analysis tasks
Without task queues, users would have to wait until these operations complete before receiving a response. This creates a poor user experience with slow page loads and timeouts.
Common Task Queue Solutions for Flask
1. Celery
Celery is the most popular task queue system for Python applications, with excellent Flask integration.
Setting up Celery with Flask
First, install the necessary packages:
pip install Flask celery redis
Next, create a basic Flask application with Celery integration:
# app.py
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# Initialize Celery
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
@app.route('/')
def index():
return "Flask Task Queue Example"
# Define a Celery task
@celery.task
def long_running_task(param1, param2):
# Simulate a task that takes 10 seconds
import time
time.sleep(10)
return param1 + param2
@app.route('/process')
def process_task():
# Trigger the task asynchronously
task = long_running_task.delay(10, 20)
return f"Task initiated with ID: {task.id}. Check results later."
@app.route('/result/<task_id>')
def task_result(task_id):
task = long_running_task.AsyncResult(task_id)
if task.state == 'PENDING':
return 'Task is pending'
elif task.state == 'FAILURE':
return f'Task failed: {str(task.info)}'
else:
return f'Task complete with result: {task.result}'
if __name__ == '__main__':
app.run(debug=True)
To run this example:
- Start Redis (Celery broker):
redis-server
- Start Celery worker:
celery -A app.celery worker --loglevel=info
- Run Flask application:
python app.py
2. Redis Queue (RQ)
RQ is a simpler alternative to Celery, using Redis as its backend.
Setting up RQ with Flask
First, install the necessary packages:
pip install Flask rq flask-rq2 redis
Create a Flask application with RQ:
# app.py
from flask import Flask
from flask_rq2 import RQ
app = Flask(__name__)
app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'
rq = RQ(app)
@app.route('/')
def index():
return "Flask RQ Task Queue Example"
# Define an RQ job
@rq.job
def slow_task(x, y):
import time
time.sleep(10) # Simulate a long process
return x + y
@app.route('/start-task')
def start_task():
job = slow_task.queue(5, 10)
return f"Task started with ID: {job.id}"
@app.route('/result/<job_id>')
def get_result(job_id):
job = slow_task.fetch_job(job_id)
if job.is_finished:
return f"Result: {job.result}"
elif job.is_failed:
return "Job failed"
else:
return "Job pending"
if __name__ == '__main__':
app.run(debug=True)
To run this example:
- Start Redis:
redis-server
- Start RQ worker:
rq worker
- Run Flask application:
python app.py
Real-World Applications
1. Sending Email Notifications
A common use case for task queues is sending emails without making users wait:
@celery.task
def send_welcome_email(user_email):
# Connect to email server
# Create message
# Send email
print(f"Sending welcome email to {user_email}")
time.sleep(5) # Simulate email sending delay
return True
@app.route('/register', methods=['POST'])
def register():
user_email = request.form['email']
# Save user to database
# Queue email sending task
send_welcome_email.delay(user_email)
return "Registration successful! Welcome email will be sent shortly."
2. Image Processing Application
When users upload images that need processing:
@celery.task
def process_image(image_path):
from PIL import Image, ImageFilter
# Open the image
img = Image.open(image_path)
# Process the image (resize, apply filter, etc.)
img = img.resize((800, 600))
img = img.filter(ImageFilter.SHARPEN)
# Save the processed image
processed_path = image_path.replace('.jpg', '_processed.jpg')
img.save(processed_path)
return processed_path
@app.route('/upload', methods=['POST'])
def upload_image():
if 'image' not in request.files:
return "No image uploaded", 400
image = request.files['image']
image_path = f"uploads/{secure_filename(image.filename)}"
image.save(image_path)
# Queue image processing task
task = process_image.delay(image_path)
return f"Image uploaded and queued for processing. Task ID: {task.id}"
3. Scheduled Report Generation
For periodic tasks, you can combine task queues with scheduling:
from celery.schedules import crontab
# Configure Celery beat schedule
celery.conf.beat_schedule = {
'generate-daily-report': {
'task': 'app.generate_daily_report',
'schedule': crontab(hour=0, minute=0), # Run at midnight
},
}
@celery.task
def generate_daily_report():
# Connect to database
# Generate report data
# Create PDF or email report
print("Generating daily report")
return "Report generated successfully"
Best Practices for Flask Task Queues
-
Keep tasks idempotent: A task should produce the same result if executed multiple times.
-
Handle failures gracefully: Implement retry mechanisms for tasks that might fail.
python@celery.task(bind=True, max_retries=3)
def reliable_task(self, x, y):
try:
# Task logic here
return x + y
except Exception as exc:
self.retry(exc=exc, countdown=60) # Retry after 1 minute -
Store task results properly: For important tasks, ensure results are properly stored.
-
Monitor your queues: Implement monitoring to detect bottlenecks or failures.
-
Set appropriate timeouts: Prevent tasks from running indefinitely.
-
Use task chains for complex workflows: Break down complex operations into smaller chained tasks.
pythonfrom celery import chain
# Create a chain of tasks
workflow = chain(
extract_data.s(),
transform_data.s(),
load_data.s()
)
# Execute the workflow
result = workflow.apply_async()
Common Challenges and Solutions
Challenge: Task Result Management
Solution: Use a result backend (Redis or database) to store task results.
Challenge: Task Queue Monitoring
Solution: Use Flower for Celery monitoring:
pip install flower
celery -A app.celery flower # Access dashboard at http://localhost:5555
Challenge: Worker Crashes
Solution: Implement supervisor or systemd to automatically restart workers.
Summary
Task queues are essential tools for Flask applications that need to handle time-consuming operations without impacting the user experience. By integrating task queue systems like Celery or RQ, you can:
- Process long-running operations in the background
- Improve application responsiveness
- Scale your application more effectively
- Handle resource-intensive operations without blocking the web server
Whether you're sending emails, processing images, generating reports, or making external API calls, task queues provide the asynchronous processing capabilities needed to build responsive Flask applications.
Additional Resources
- Celery Documentation
- Redis Queue Documentation
- Flask-Celery-Helper
- Flask Factory Pattern with Celery
Exercises
- Modify the email sending example to include attachments and HTML content.
- Create a Flask application that uses task queues to generate and serve PDF reports.
- Implement a batch processing system using task queues to process large CSV files.
- Build an application that uses scheduled tasks to scrape a website daily and store the results.
- Create a Flask API that uses task queues to handle long-running data science computations.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)