Flask Async Tasks
Introduction
In web applications, some operations can take a significant amount of time to complete, such as image processing, sending emails, or generating reports. Rather than making users wait for these operations to finish, it's often better to handle them asynchronously in the background. This approach improves user experience by providing immediate responses while processing time-consuming tasks separately.
Flask, being a lightweight web framework, doesn't include built-in support for background task processing. However, we can integrate Flask with tools like Celery and Redis to implement robust asynchronous task handling. This tutorial will guide you through setting up and using asynchronous tasks in a Flask application.
Why Use Async Tasks?
Before diving into implementation, let's understand why async tasks are important:
- Better User Experience: Users don't have to wait for long-running operations to complete
- Improved Application Performance: Your Flask server can respond to new requests instead of being tied up with lengthy operations
- Resource Management: You can control how many background tasks run simultaneously
- Task Scheduling: Execute tasks at specific times or intervals
- Reliability: Retry mechanisms for failed tasks
Setting Up Celery with Flask
Celery is a distributed task queue that helps with asynchronous task processing. Let's set it up with Flask and Redis (which we'll use as our message broker).
Installation
First, install the required packages:
pip install Flask celery[redis] redis
Basic Project Structure
For a clean implementation, let's organize our project like this:
flask_async_app/
├── app/
│ ├── __init__.py
│ ├── routes.py
│ └── tasks.py
├── config.py
└── run.py
Configuration
Let's create our configuration file (config.py
):
class Config:
SECRET_KEY = 'your-secret-key'
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
Flask Application Factory
Now, let's set up our Flask app with Celery integration (app/__init__.py
):
from flask import Flask
from celery import Celery
from config import Config
# Initialize Celery
celery = Celery(__name__,
broker=Config.CELERY_BROKER_URL,
backend=Config.CELERY_RESULT_BACKEND)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
# Initialize Celery with Flask app context
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
# Register blueprints
from app.routes import main
app.register_blueprint(main)
return app
Creating Tasks
Let's define some async tasks in app/tasks.py
:
from app import celery
import time
@celery.task
def long_running_task(param1, param2):
"""
A sample long-running task that simulates heavy processing
"""
# Simulate a task that takes 10 seconds
time.sleep(10)
result = param1 + param2
return f"Task completed with result: {result}"
@celery.task
def send_email_task(recipient, subject, body):
"""
Task to send email asynchronously
"""
# Here you would integrate with an email sending library
time.sleep(5) # Simulate email sending delay
print(f"Email sent to {recipient} with subject: {subject}")
return True
Setting Up Routes
Let's create some routes to trigger these tasks (app/routes.py
):
from flask import Blueprint, jsonify, request
from app.tasks import long_running_task, send_email_task
main = Blueprint('main', __name__)
@main.route('/process', methods=['POST'])
def process():
data = request.json
# Start the task asynchronously
task = long_running_task.delay(data['param1'], data['param2'])
# Return immediately with the task ID
return jsonify({
"message": "Task started",
"task_id": task.id
}), 202
@main.route('/send-email', methods=['POST'])
def send_email():
data = request.json
task = send_email_task.delay(
data['recipient'],
data['subject'],
data['body']
)
return jsonify({
"message": "Email sending initiated",
"task_id": task.id
}), 202
@main.route('/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
task = long_running_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'status': 'Task is pending...'
}
elif task.state == 'FAILURE':
response = {
'state': task.state,
'status': 'Task failed',
'error': str(task.info)
}
else:
response = {
'state': task.state,
'status': task.info if task.info else 'Task completed'
}
return jsonify(response)
Running the Application
Finally, create a run.py
file:
from app import create_app
app = create_app()
if __name__ == '__main__':
app.run(debug=True)
Running Your Async Flask Application
To run your application with Celery support, you'll need:
- Redis server running (message broker)
- Celery worker process
- Flask application server
Here's how to start each component:
Start Redis
If you have Docker installed, this is the easiest way:
docker run -d -p 6379:6379 redis
Otherwise, install and run Redis according to your operating system's instructions.
Start Celery Worker
From your project root directory, run:
celery -A app.celery worker --loglevel=info
Start Flask Application
python run.py
Testing the Async Tasks
Once everything is running, you can test your async tasks using curl or Postman.
Example Request with curl:
curl -X POST http://localhost:5000/process \
-H "Content-Type: application/json" \
-d '{"param1": 10, "param2": 20}'
Sample Response:
{
"message": "Task started",
"task_id": "a3b7d9c1-e45f-8g2h-i1j2-k3l4m5n6o7p8"
}
Check Task Status:
curl http://localhost:5000/task/a3b7d9c1-e45f-8g2h-i1j2-k3l4m5n6o7p8
Initial Response:
{
"state": "PENDING",
"status": "Task is pending..."
}
After Task Completion:
{
"state": "SUCCESS",
"status": "Task completed with result: 30"
}
Advanced Topics
Task Scheduling
You can schedule tasks to run at specific times:
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Execute daily at midnight
sender.add_periodic_task(
crontab(hour=0, minute=0),
backup_database.s(),
)
# Execute every 30 minutes
sender.add_periodic_task(
30 * 60, # 30 minutes in seconds
check_system_health.s(),
)
@celery.task
def backup_database():
# Database backup logic here
pass
@celery.task
def check_system_health():
# Health check logic here
pass
Task Chaining
You can chain tasks so one executes after another completes:
from celery import chain
# In your route handler
@main.route('/process-image', methods=['POST'])
def process_image():
img_path = request.json.get('image_path')
# Create a chain of tasks
task_chain = chain(
resize_image.s(img_path, (800, 600)),
add_watermark.s("Copyright 2023"),
upload_to_cloud.s()
)
result = task_chain()
return jsonify({
"message": "Image processing started",
"task_id": result.id
}), 202
Error Handling and Retries
Celery provides built-in retry mechanisms:
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def process_payment(self, payment_id, amount):
try:
# Try to process payment
payment_gateway.charge(payment_id, amount)
except PaymentGatewayError as exc:
# Retry after 60 seconds
self.retry(exc=exc)
Real-World Example: File Processing Application
Let's build a more comprehensive example of a file processing application:
Tasks Definition
import os
import time
from app import celery
from PIL import Image
@celery.task(bind=True)
def process_image_task(self, image_path):
"""Process an image by converting to grayscale, resizing, and optimizing"""
try:
# Update task state to track progress
self.update_state(state='PROCESSING', meta={'step': 'Loading image'})
# Open the image
img = Image.open(image_path)
# Convert to grayscale
self.update_state(state='PROCESSING', meta={'step': 'Converting to grayscale'})
gray_img = img.convert('L')
time.sleep(2) # Simulate processing time
# Resize the image
self.update_state(state='PROCESSING', meta={'step': 'Resizing image'})
resized_img = gray_img.resize((int(img.width/2), int(img.height/2)))
time.sleep(3) # Simulate processing time
# Save the processed image
self.update_state(state='PROCESSING', meta={'step': 'Saving processed image'})
output_path = os.path.splitext(image_path)[0] + "_processed.jpg"
resized_img.save(output_path, optimize=True, quality=85)
return {
'status': 'success',
'original_path': image_path,
'processed_path': output_path
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
Route for File Upload and Processing
import os
from flask import Blueprint, request, jsonify, current_app
from werkzeug.utils import secure_filename
from app.tasks import process_image_task
main = Blueprint('main', __name__)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@main.route('/upload', methods=['POST'])
def upload_file():
# Check if the post request has the file part
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
# If user does not select file, browser also
# submit an empty part without filename
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
upload_folder = os.path.join(current_app.static_folder, 'uploads')
# Create directory if it doesn't exist
os.makedirs(upload_folder, exist_ok=True)
filepath = os.path.join(upload_folder, filename)
file.save(filepath)
# Start async processing task
task = process_image_task.delay(filepath)
return jsonify({
'message': 'File uploaded and processing started',
'filename': filename,
'task_id': task.id
}), 202
return jsonify({'error': 'File type not allowed'}), 400
@main.route('/process-status/<task_id>', methods=['GET'])
def get_process_status(task_id):
task = process_image_task.AsyncResult(task_id)
if task.state == 'PENDING':
response = {
'state': task.state,
'status': 'Pending...'
}
elif task.state == 'PROCESSING':
response = {
'state': task.state,
'status': task.info.get('step', '')
}
elif task.state == 'FAILURE':
response = {
'state': task.state,
'status': 'Error',
'error': str(task.info)
}
else:
response = {
'state': task.state,
'status': 'Complete',
'result': task.info
}
return jsonify(response)
This example demonstrates a more complex workflow where:
- Users upload an image file
- The backend saves it and initiates an async task
- Celery processes the image through multiple steps
- The task updates its state during processing
- Users can check the processing status
Best Practices for Flask Async Tasks
- Keep Tasks Small and Focused: Each task should do one thing well
- Make Tasks Idempotent: They should produce the same result if executed multiple times
- Handle Errors Properly: Implement robust error handling and retry mechanisms
- Monitor Task Queues: Use Flower (a Celery monitoring tool) to keep track of tasks
- Consider Task Result Expiration: Set appropriate expiration times for task results
- Use Task Timeouts: Set timeouts to prevent tasks from running indefinitely
- Implement Graceful Degradation: Your app should still function (in limited capacity) if the task queue is down
- Test Your Tasks: Write unit tests for your background tasks
Summary
In this guide, we've covered how to implement asynchronous tasks in Flask applications using Celery and Redis. We've seen how to set up the environment, define tasks, trigger them from Flask routes, and check their status. We've also explored advanced topics like task scheduling, chaining, and retries, as well as a comprehensive real-world example.
Asynchronous tasks significantly improve the performance and user experience of your Flask applications by handling time-consuming operations in the background. This approach is essential for any modern web application that needs to perform complex operations without making users wait.
Additional Resources
- Official Celery Documentation
- Flask Documentation
- Redis Documentation
- Flower: Celery Monitoring Tool
Exercises
- Create a Flask application that processes uploaded CSV files asynchronously and returns the results when ready.
- Implement a scheduled task that fetches data from an external API every hour and updates the database.
- Build an email notification system with a dashboard showing sent/failed emails.
- Create a task that generates PDF reports and sends them to users.
- Implement progress tracking for a long-running batch processing task.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)