FastAPI Redis Queue
When building web applications, certain operations may take too long to process during a regular HTTP request. These operations, such as sending emails, processing images, or generating reports, are perfect candidates for background processing. In this tutorial, we'll explore how to use Redis Queue (RQ) with FastAPI to handle background tasks efficiently.
What is Redis Queue?
Redis Queue (RQ) is a simple Python library for queueing jobs and processing them in the background with workers. It's built on top of Redis, an in-memory data structure store that can be used as a database, cache, and message broker.
Using RQ with FastAPI allows us to:
- Offload time-consuming tasks from the request-response cycle
- Process tasks asynchronously in the background
- Scale our application more effectively
- Improve user experience by not making users wait for long operations
Prerequisites
Before we start, make sure you have the following installed:
pip install fastapi uvicorn redis rq
You'll also need a Redis server running. You can install and run Redis locally, or use a Docker container:
docker run -d -p 6379:6379 redis
Setting up a Basic FastAPI Application with Redis Queue
Let's create a simple FastAPI application that uses Redis Queue for background processing.
Step 1: Create the project structure
fastapi-redis-queue/
├── main.py # FastAPI application
├── worker.py # RQ worker
└── tasks.py # Background tasks
Step 2: Define the background tasks
Let's create a file named tasks.py
with some example background tasks:
import time
from datetime import datetime
def process_long_running_task(task_id):
"""
A sample long-running task.
This simulates a task that takes a long time to complete.
"""
print(f"Starting task {task_id} at {datetime.now()}")
# Simulate a time-consuming task
time.sleep(10)
result = f"Task {task_id} completed at {datetime.now()}"
print(result)
return result
def send_email(to_address, subject, content):
"""
Simulates sending an email.
In a real application, this would use an email service.
"""
print(f"Sending email to {to_address}")
# Simulate delay in sending email
time.sleep(2)
print(f"Email sent to {to_address} with subject: {subject}")
return {"status": "sent", "to": to_address, "subject": subject}
Step 3: Create the RQ worker
Now, let's create the worker.py
file that will process our background tasks:
import redis
from rq import Worker, Queue, Connection
# Configure Redis connection
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
# Define which queues this worker should listen to
listen = ['default']
if __name__ == '__main__':
with Connection(redis_conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Step 4: Create the FastAPI application
Next, let's create the main.py
file with our FastAPI application:
from fastapi import FastAPI, BackgroundTasks, HTTPException
import redis
from rq import Queue
from uuid import uuid4
from tasks import process_long_running_task, send_email
app = FastAPI(title="FastAPI Redis Queue Example")
# Configure Redis connection
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
# Create RQ queue
queue = Queue(connection=redis_conn)
# In-memory storage for task results - in production use Redis or a database
task_results = {}
@app.get("/")
async def root():
return {"message": "FastAPI Redis Queue Example"}
@app.post("/tasks/long-running")
async def create_long_running_task():
task_id = str(uuid4())
# Enqueue the task
job = queue.enqueue(process_long_running_task, task_id)
return {
"task_id": task_id,
"job_id": job.id,
"status": "queued",
"message": "Task has been queued for processing"
}
@app.post("/email/send")
async def send_email_task(to_address: str, subject: str, content: str):
# Enqueue email sending task
job = queue.enqueue(send_email, to_address, subject, content)
return {
"job_id": job.id,
"status": "queued",
"message": f"Email to {to_address} has been queued"
}
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
status = {
"job_id": job_id,
"status": job.get_status(),
"result": job.result,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at
}
return status
Step 5: Run the application
Now you can run the application with the following commands:
- Start the FastAPI application:
uvicorn main:app --reload
- Start the RQ worker in a separate terminal:
python worker.py
How It Works
Let's break down the workflow of our application:
- When a client makes a request to
/tasks/long-running
or/email/send
, the FastAPI endpoint enqueues a task in Redis Queue - The endpoint immediately returns a response with the job ID, without waiting for the task to complete
- The RQ worker, running in a separate process, picks up the task from the queue and executes it
- Clients can check the status of their tasks by making requests to
/jobs/{job_id}
Advanced Usage: Multiple Queues
In a real-world application, you might want to have different queues for different types of tasks. Let's modify our application to support multiple queues:
# In main.py
default_queue = Queue('default', connection=redis_conn)
email_queue = Queue('email', connection=redis_conn)
high_priority_queue = Queue('high', connection=redis_conn)
@app.post("/tasks/long-running")
async def create_long_running_task():
task_id = str(uuid4())
# Use the default queue
job = default_queue.enqueue(process_long_running_task, task_id)
return {"task_id": task_id, "job_id": job.id, "status": "queued"}
@app.post("/email/send")
async def send_email_task(to_address: str, subject: str, content: str):
# Use the email queue
job = email_queue.enqueue(send_email, to_address, subject, content)
return {"job_id": job.id, "status": "queued"}
@app.post("/tasks/high-priority")
async def high_priority_task(task_name: str):
# Use the high priority queue
job = high_priority_queue.enqueue(process_long_running_task, task_name)
return {"job_id": job.id, "status": "queued", "priority": "high"}
Then update the worker to listen to multiple queues:
# In worker.py
listen = ['high', 'default', 'email'] # Order matters - high priority first
if __name__ == '__main__':
with Connection(redis_conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
Adding Job Timeouts and Retries
RQ allows you to specify timeouts and automatic retry behavior:
# Set a timeout of 30 seconds for this job
job = queue.enqueue(process_long_running_task, task_id, job_timeout=30)
# Retry the job up to 3 times if it fails
job = queue.enqueue(send_email, to_address, subject, content,
retry=Retry(max=3, interval=[10, 30, 60]))
Real-world Example: Report Generation
Here's a more practical example of using Redis Queue for generating reports:
# In tasks.py
import pandas as pd
import time
from datetime import datetime
def generate_report(user_id, report_params):
"""
Generate a complex report based on user data.
This simulates a database-intensive operation.
"""
print(f"Generating report for user {user_id} at {datetime.now()}")
# Simulate fetching data from database
time.sleep(5)
# Simulate data processing
time.sleep(3)
# Simulate report creation
report_data = {
"user_id": user_id,
"generated_at": datetime.now().isoformat(),
"report_type": report_params.get("type", "general"),
"data_points": 1000,
"summary": "This is a sample report summary"
}
# In a real application, you might save this to a database or file storage
print(f"Report for user {user_id} completed at {datetime.now()}")
return report_data
# In main.py
@app.post("/reports/generate")
async def generate_user_report(user_id: int, report_type: str):
# Enqueue report generation
job = queue.enqueue(
generate_report,
user_id,
{"type": report_type},
job_timeout=300 # Allow 5 minutes for report generation
)
return {
"job_id": job.id,
"user_id": user_id,
"status": "queued",
"message": "Report generation has been queued"
}
@app.get("/reports/status/{job_id}")
async def get_report_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Report job not found")
if job.get_status() == "finished":
return {
"status": "completed",
"report_data": job.result
}
return {
"status": job.get_status(),
"position": job.get_position()
}
Summary
In this tutorial, we've learned:
- How to integrate Redis Queue (RQ) with FastAPI for background processing
- Setting up workers to process queued tasks
- Creating and monitoring background jobs
- Working with multiple queues for different priorities
- Implementing timeouts and retry mechanisms
- Building a practical report generation system
Using Redis Queue with FastAPI provides a powerful way to handle time-consuming tasks without blocking the main request-response cycle, resulting in a more responsive and scalable application.
Additional Resources
Exercises
- Implement a background task that processes uploaded images (resizing, format conversion, etc.)
- Create a system that sends batch emails with progress tracking
- Build a job scheduler that allows scheduling tasks for future execution using RQ Scheduler
- Extend the report generation example to save reports to a database and notify users when they're ready
- Implement a dashboard that shows the current status of all queues and workers
By combining FastAPI's performance with Redis Queue's simplicity and reliability, you can build powerful asynchronous processing systems that scale well and provide excellent user experiences.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)