Skip to main content

Flask Task Queues

In web development, particularly with Flask applications, users expect responsive interfaces that don't freeze while processing requests. Task queues solve this problem by allowing you to execute time-consuming operations asynchronously, outside of the main request-response cycle.

What Are Task Queues?

Task queues are mechanisms that allow you to defer the execution of work outside the HTTP request-response cycle. They're essential tools for modern web applications when you need to:

  • Process long-running tasks without blocking user interactions
  • Schedule tasks to run in the background
  • Distribute workloads across multiple worker processes
  • Handle resource-intensive operations without impacting user experience

Why Use Task Queues in Flask?

Flask, being a synchronous web framework, executes requests one at a time. When a request involves time-consuming operations such as:

  • Sending emails
  • Processing images or videos
  • Generating reports
  • Making external API calls
  • Data analysis tasks

Without task queues, users would have to wait until these operations complete before receiving a response. This creates a poor user experience with slow page loads and timeouts.

Common Task Queue Solutions for Flask

1. Celery

Celery is the most popular task queue system for Python applications, with excellent Flask integration.

Setting up Celery with Flask

First, install the necessary packages:

bash
pip install Flask celery redis

Next, create a basic Flask application with Celery integration:

python
# app.py
from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

# Initialize Celery
celery = Celery(
app.name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)

@app.route('/')
def index():
return "Flask Task Queue Example"

# Define a Celery task
@celery.task
def long_running_task(param1, param2):
# Simulate a task that takes 10 seconds
import time
time.sleep(10)
return param1 + param2

@app.route('/process')
def process_task():
# Trigger the task asynchronously
task = long_running_task.delay(10, 20)
return f"Task initiated with ID: {task.id}. Check results later."

@app.route('/result/<task_id>')
def task_result(task_id):
task = long_running_task.AsyncResult(task_id)
if task.state == 'PENDING':
return 'Task is pending'
elif task.state == 'FAILURE':
return f'Task failed: {str(task.info)}'
else:
return f'Task complete with result: {task.result}'

if __name__ == '__main__':
app.run(debug=True)

To run this example:

  1. Start Redis (Celery broker): redis-server
  2. Start Celery worker: celery -A app.celery worker --loglevel=info
  3. Run Flask application: python app.py

2. Redis Queue (RQ)

RQ is a simpler alternative to Celery, using Redis as its backend.

Setting up RQ with Flask

First, install the necessary packages:

bash
pip install Flask rq flask-rq2 redis

Create a Flask application with RQ:

python
# app.py
from flask import Flask
from flask_rq2 import RQ

app = Flask(__name__)
app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'
rq = RQ(app)

@app.route('/')
def index():
return "Flask RQ Task Queue Example"

# Define an RQ job
@rq.job
def slow_task(x, y):
import time
time.sleep(10) # Simulate a long process
return x + y

@app.route('/start-task')
def start_task():
job = slow_task.queue(5, 10)
return f"Task started with ID: {job.id}"

@app.route('/result/<job_id>')
def get_result(job_id):
job = slow_task.fetch_job(job_id)
if job.is_finished:
return f"Result: {job.result}"
elif job.is_failed:
return "Job failed"
else:
return "Job pending"

if __name__ == '__main__':
app.run(debug=True)

To run this example:

  1. Start Redis: redis-server
  2. Start RQ worker: rq worker
  3. Run Flask application: python app.py

Real-World Applications

1. Sending Email Notifications

A common use case for task queues is sending emails without making users wait:

python
@celery.task
def send_welcome_email(user_email):
# Connect to email server
# Create message
# Send email
print(f"Sending welcome email to {user_email}")
time.sleep(5) # Simulate email sending delay
return True

@app.route('/register', methods=['POST'])
def register():
user_email = request.form['email']
# Save user to database

# Queue email sending task
send_welcome_email.delay(user_email)

return "Registration successful! Welcome email will be sent shortly."

2. Image Processing Application

When users upload images that need processing:

python
@celery.task
def process_image(image_path):
from PIL import Image, ImageFilter

# Open the image
img = Image.open(image_path)

# Process the image (resize, apply filter, etc.)
img = img.resize((800, 600))
img = img.filter(ImageFilter.SHARPEN)

# Save the processed image
processed_path = image_path.replace('.jpg', '_processed.jpg')
img.save(processed_path)

return processed_path

@app.route('/upload', methods=['POST'])
def upload_image():
if 'image' not in request.files:
return "No image uploaded", 400

image = request.files['image']
image_path = f"uploads/{secure_filename(image.filename)}"
image.save(image_path)

# Queue image processing task
task = process_image.delay(image_path)

return f"Image uploaded and queued for processing. Task ID: {task.id}"

3. Scheduled Report Generation

For periodic tasks, you can combine task queues with scheduling:

python
from celery.schedules import crontab

# Configure Celery beat schedule
celery.conf.beat_schedule = {
'generate-daily-report': {
'task': 'app.generate_daily_report',
'schedule': crontab(hour=0, minute=0), # Run at midnight
},
}

@celery.task
def generate_daily_report():
# Connect to database
# Generate report data
# Create PDF or email report
print("Generating daily report")
return "Report generated successfully"

Best Practices for Flask Task Queues

  1. Keep tasks idempotent: A task should produce the same result if executed multiple times.

  2. Handle failures gracefully: Implement retry mechanisms for tasks that might fail.

    python
    @celery.task(bind=True, max_retries=3)
    def reliable_task(self, x, y):
    try:
    # Task logic here
    return x + y
    except Exception as exc:
    self.retry(exc=exc, countdown=60) # Retry after 1 minute
  3. Store task results properly: For important tasks, ensure results are properly stored.

  4. Monitor your queues: Implement monitoring to detect bottlenecks or failures.

  5. Set appropriate timeouts: Prevent tasks from running indefinitely.

  6. Use task chains for complex workflows: Break down complex operations into smaller chained tasks.

    python
    from celery import chain

    # Create a chain of tasks
    workflow = chain(
    extract_data.s(),
    transform_data.s(),
    load_data.s()
    )

    # Execute the workflow
    result = workflow.apply_async()

Common Challenges and Solutions

Challenge: Task Result Management

Solution: Use a result backend (Redis or database) to store task results.

Challenge: Task Queue Monitoring

Solution: Use Flower for Celery monitoring:

bash
pip install flower
celery -A app.celery flower # Access dashboard at http://localhost:5555

Challenge: Worker Crashes

Solution: Implement supervisor or systemd to automatically restart workers.

Summary

Task queues are essential tools for Flask applications that need to handle time-consuming operations without impacting the user experience. By integrating task queue systems like Celery or RQ, you can:

  • Process long-running operations in the background
  • Improve application responsiveness
  • Scale your application more effectively
  • Handle resource-intensive operations without blocking the web server

Whether you're sending emails, processing images, generating reports, or making external API calls, task queues provide the asynchronous processing capabilities needed to build responsive Flask applications.

Additional Resources

  1. Celery Documentation
  2. Redis Queue Documentation
  3. Flask-Celery-Helper
  4. Flask Factory Pattern with Celery

Exercises

  1. Modify the email sending example to include attachments and HTML content.
  2. Create a Flask application that uses task queues to generate and serve PDF reports.
  3. Implement a batch processing system using task queues to process large CSV files.
  4. Build an application that uses scheduled tasks to scrape a website daily and store the results.
  5. Create a Flask API that uses task queues to handle long-running data science computations.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)