Skip to main content

FastAPI Redis Queue

When building web applications, certain operations may take too long to process during a regular HTTP request. These operations, such as sending emails, processing images, or generating reports, are perfect candidates for background processing. In this tutorial, we'll explore how to use Redis Queue (RQ) with FastAPI to handle background tasks efficiently.

What is Redis Queue?

Redis Queue (RQ) is a simple Python library for queueing jobs and processing them in the background with workers. It's built on top of Redis, an in-memory data structure store that can be used as a database, cache, and message broker.

Using RQ with FastAPI allows us to:

  • Offload time-consuming tasks from the request-response cycle
  • Process tasks asynchronously in the background
  • Scale our application more effectively
  • Improve user experience by not making users wait for long operations

Prerequisites

Before we start, make sure you have the following installed:

bash
pip install fastapi uvicorn redis rq

You'll also need a Redis server running. You can install and run Redis locally, or use a Docker container:

bash
docker run -d -p 6379:6379 redis

Setting up a Basic FastAPI Application with Redis Queue

Let's create a simple FastAPI application that uses Redis Queue for background processing.

Step 1: Create the project structure

fastapi-redis-queue/
├── main.py # FastAPI application
├── worker.py # RQ worker
└── tasks.py # Background tasks

Step 2: Define the background tasks

Let's create a file named tasks.py with some example background tasks:

python
import time
from datetime import datetime

def process_long_running_task(task_id):
"""
A sample long-running task.
This simulates a task that takes a long time to complete.
"""
print(f"Starting task {task_id} at {datetime.now()}")
# Simulate a time-consuming task
time.sleep(10)
result = f"Task {task_id} completed at {datetime.now()}"
print(result)
return result

def send_email(to_address, subject, content):
"""
Simulates sending an email.
In a real application, this would use an email service.
"""
print(f"Sending email to {to_address}")
# Simulate delay in sending email
time.sleep(2)
print(f"Email sent to {to_address} with subject: {subject}")
return {"status": "sent", "to": to_address, "subject": subject}

Step 3: Create the RQ worker

Now, let's create the worker.py file that will process our background tasks:

python
import redis
from rq import Worker, Queue, Connection

# Configure Redis connection
redis_conn = redis.Redis(host='localhost', port=6379, db=0)

# Define which queues this worker should listen to
listen = ['default']

if __name__ == '__main__':
with Connection(redis_conn):
worker = Worker(list(map(Queue, listen)))
worker.work()

Step 4: Create the FastAPI application

Next, let's create the main.py file with our FastAPI application:

python
from fastapi import FastAPI, BackgroundTasks, HTTPException
import redis
from rq import Queue
from uuid import uuid4

from tasks import process_long_running_task, send_email

app = FastAPI(title="FastAPI Redis Queue Example")

# Configure Redis connection
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
# Create RQ queue
queue = Queue(connection=redis_conn)

# In-memory storage for task results - in production use Redis or a database
task_results = {}

@app.get("/")
async def root():
return {"message": "FastAPI Redis Queue Example"}

@app.post("/tasks/long-running")
async def create_long_running_task():
task_id = str(uuid4())
# Enqueue the task
job = queue.enqueue(process_long_running_task, task_id)

return {
"task_id": task_id,
"job_id": job.id,
"status": "queued",
"message": "Task has been queued for processing"
}

@app.post("/email/send")
async def send_email_task(to_address: str, subject: str, content: str):
# Enqueue email sending task
job = queue.enqueue(send_email, to_address, subject, content)

return {
"job_id": job.id,
"status": "queued",
"message": f"Email to {to_address} has been queued"
}

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")

status = {
"job_id": job_id,
"status": job.get_status(),
"result": job.result,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at
}

return status

Step 5: Run the application

Now you can run the application with the following commands:

  1. Start the FastAPI application:
bash
uvicorn main:app --reload
  1. Start the RQ worker in a separate terminal:
bash
python worker.py

How It Works

Let's break down the workflow of our application:

  1. When a client makes a request to /tasks/long-running or /email/send, the FastAPI endpoint enqueues a task in Redis Queue
  2. The endpoint immediately returns a response with the job ID, without waiting for the task to complete
  3. The RQ worker, running in a separate process, picks up the task from the queue and executes it
  4. Clients can check the status of their tasks by making requests to /jobs/{job_id}

Advanced Usage: Multiple Queues

In a real-world application, you might want to have different queues for different types of tasks. Let's modify our application to support multiple queues:

python
# In main.py
default_queue = Queue('default', connection=redis_conn)
email_queue = Queue('email', connection=redis_conn)
high_priority_queue = Queue('high', connection=redis_conn)

@app.post("/tasks/long-running")
async def create_long_running_task():
task_id = str(uuid4())
# Use the default queue
job = default_queue.enqueue(process_long_running_task, task_id)
return {"task_id": task_id, "job_id": job.id, "status": "queued"}

@app.post("/email/send")
async def send_email_task(to_address: str, subject: str, content: str):
# Use the email queue
job = email_queue.enqueue(send_email, to_address, subject, content)
return {"job_id": job.id, "status": "queued"}

@app.post("/tasks/high-priority")
async def high_priority_task(task_name: str):
# Use the high priority queue
job = high_priority_queue.enqueue(process_long_running_task, task_name)
return {"job_id": job.id, "status": "queued", "priority": "high"}

Then update the worker to listen to multiple queues:

python
# In worker.py
listen = ['high', 'default', 'email'] # Order matters - high priority first

if __name__ == '__main__':
with Connection(redis_conn):
worker = Worker(list(map(Queue, listen)))
worker.work()

Adding Job Timeouts and Retries

RQ allows you to specify timeouts and automatic retry behavior:

python
# Set a timeout of 30 seconds for this job
job = queue.enqueue(process_long_running_task, task_id, job_timeout=30)

# Retry the job up to 3 times if it fails
job = queue.enqueue(send_email, to_address, subject, content,
retry=Retry(max=3, interval=[10, 30, 60]))

Real-world Example: Report Generation

Here's a more practical example of using Redis Queue for generating reports:

python
# In tasks.py
import pandas as pd
import time
from datetime import datetime

def generate_report(user_id, report_params):
"""
Generate a complex report based on user data.
This simulates a database-intensive operation.
"""
print(f"Generating report for user {user_id} at {datetime.now()}")

# Simulate fetching data from database
time.sleep(5)

# Simulate data processing
time.sleep(3)

# Simulate report creation
report_data = {
"user_id": user_id,
"generated_at": datetime.now().isoformat(),
"report_type": report_params.get("type", "general"),
"data_points": 1000,
"summary": "This is a sample report summary"
}

# In a real application, you might save this to a database or file storage
print(f"Report for user {user_id} completed at {datetime.now()}")
return report_data

# In main.py
@app.post("/reports/generate")
async def generate_user_report(user_id: int, report_type: str):
# Enqueue report generation
job = queue.enqueue(
generate_report,
user_id,
{"type": report_type},
job_timeout=300 # Allow 5 minutes for report generation
)

return {
"job_id": job.id,
"user_id": user_id,
"status": "queued",
"message": "Report generation has been queued"
}

@app.get("/reports/status/{job_id}")
async def get_report_status(job_id: str):
job = queue.fetch_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Report job not found")

if job.get_status() == "finished":
return {
"status": "completed",
"report_data": job.result
}

return {
"status": job.get_status(),
"position": job.get_position()
}

Summary

In this tutorial, we've learned:

  1. How to integrate Redis Queue (RQ) with FastAPI for background processing
  2. Setting up workers to process queued tasks
  3. Creating and monitoring background jobs
  4. Working with multiple queues for different priorities
  5. Implementing timeouts and retry mechanisms
  6. Building a practical report generation system

Using Redis Queue with FastAPI provides a powerful way to handle time-consuming tasks without blocking the main request-response cycle, resulting in a more responsive and scalable application.

Additional Resources

Exercises

  1. Implement a background task that processes uploaded images (resizing, format conversion, etc.)
  2. Create a system that sends batch emails with progress tracking
  3. Build a job scheduler that allows scheduling tasks for future execution using RQ Scheduler
  4. Extend the report generation example to save reports to a database and notify users when they're ready
  5. Implement a dashboard that shows the current status of all queues and workers

By combining FastAPI's performance with Redis Queue's simplicity and reliability, you can build powerful asynchronous processing systems that scale well and provide excellent user experiences.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)