Skip to main content

FastAPI Task Results

Introduction

When working with background tasks in FastAPI, you'll often need a way to check on their progress or retrieve their final results. Unlike instantly returning values from standard routes, background tasks execute independently from the main request-response cycle, necessitating specialized approaches for result management.

In this tutorial, you'll learn how to:

  • Track the status of background tasks
  • Retrieve results from completed background tasks
  • Implement patterns for handling task results efficiently
  • Work with task queue systems like Celery to manage results

Understanding Task Result Patterns

Before diving into implementation details, let's understand the core patterns for handling task results in FastAPI:

  1. Status Endpoints: Creating dedicated endpoints to check task status
  2. WebSockets: Using real-time connections to push updates to clients
  3. Task Queue Systems: Leveraging specialized tools like Celery for result storage
  4. Database Storage: Storing task results in a database for later retrieval

Basic Task Result Tracking

Using In-Memory Storage

For simple applications, you can track task results using in-memory storage:

python
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
import time
import uuid
from typing import Dict

app = FastAPI()

# Simple in-memory storage for task results
task_results: Dict[str, Dict] = {}

class Task(BaseModel):
id: str
status: str
result: Dict = None

@app.post("/tasks/", response_model=Task)
async def create_task(background_tasks: BackgroundTasks):
task_id = str(uuid.uuid4())
task_results[task_id] = {"status": "processing", "result": None}

def process_task():
# Simulate long-running task
time.sleep(5)
# Store the result
task_results[task_id] = {"status": "completed", "result": {"message": "Task completed successfully"}}

background_tasks.add_task(process_task)
return Task(id=task_id, status="processing")

@app.get("/tasks/{task_id}", response_model=Task)
async def get_task_result(task_id: str):
if task_id not in task_results:
return {"id": task_id, "status": "not_found"}

result = task_results[task_id]
return {"id": task_id, **result}

In this example:

  1. We create a dictionary to store task results
  2. Generate a unique ID for each task
  3. Update the task status and result when the task completes
  4. Provide an endpoint to check the task status

Using Task Results with Database Storage

For production applications, in-memory storage isn't reliable. Let's implement a pattern using a database:

python
from fastapi import FastAPI, BackgroundTasks, Depends, HTTPException
from sqlalchemy import Column, String, JSON, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
import uuid
import time
from typing import Dict

# Database setup
SQLALCHEMY_DATABASE_URL = "sqlite:///./task_results.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class TaskResult(Base):
__tablename__ = "task_results"

id = Column(String, primary_key=True)
status = Column(String)
result = Column(JSON, nullable=True)

Base.metadata.create_all(bind=engine)

# FastAPI app
app = FastAPI()

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

@app.post("/tasks/")
async def create_task(background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
task_id = str(uuid.uuid4())

# Create task record
db_task = TaskResult(id=task_id, status="processing")
db.add(db_task)
db.commit()

def process_task():
# Simulate work
time.sleep(5)

# Update with result
db_session = SessionLocal()
try:
task = db_session.query(TaskResult).filter(TaskResult.id == task_id).first()
task.status = "completed"
task.result = {"message": "Task completed successfully"}
db_session.commit()
finally:
db_session.close()

background_tasks.add_task(process_task)
return {"task_id": task_id, "status": "processing"}

@app.get("/tasks/{task_id}")
async def get_task_result(task_id: str, db: Session = Depends(get_db)):
task = db.query(TaskResult).filter(TaskResult.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="Task not found")

return {"task_id": task.id, "status": task.status, "result": task.result}

This approach:

  1. Creates a database table for task results
  2. Stores initial task status when creating the task
  3. Updates the database when the task completes
  4. Provides an endpoint to check current status and result

Implementing Task Results with Celery

For more complex applications, Celery is an excellent choice for handling background tasks and their results:

python
from fastapi import FastAPI
from celery import Celery
from celery.result import AsyncResult
import time

# Setup Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

@celery_app.task
def process_data(data):
# Simulate processing
time.sleep(5)
return {"processed": data, "timestamp": time.time()}

# FastAPI app
app = FastAPI()

@app.post("/tasks/")
async def create_task(data: dict):
# Submit task to Celery and get task ID
task = process_data.delay(data)
return {"task_id": task.id}

@app.get("/tasks/{task_id}")
async def get_task_result(task_id: str):
# Get task result from Celery
task_result = AsyncResult(task_id, app=celery_app)

if task_result.ready():
# Task has completed
if task_result.successful():
return {
"status": "completed",
"result": task_result.result
}
else:
# Task failed
return {
"status": "failed",
"error": str(task_result.result)
}
else:
# Task is still running
return {"status": "processing"}

Key benefits of the Celery approach:

  1. Built-in result storage and retrieval
  2. Automatic task retry and error handling
  3. Distributed task processing
  4. Result expiration and cleanup

Real-time Task Updates with WebSockets

For applications that need real-time updates, WebSockets provide an efficient solution:

python
from fastapi import FastAPI, WebSocket
from celery import Celery
import asyncio
import json
import uuid

# Set up Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)

# Task connections store
active_connections = {}

# FastAPI app
app = FastAPI()

@celery_app.task(bind=True)
def long_running_task(self, task_id):
# Example task that updates progress
total_steps = 10
for i in range(total_steps):
# Update task progress
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total_steps}
)
time.sleep(1)
return {"status": "completed"}

@app.post("/tasks/")
async def create_task():
task_id = str(uuid.uuid4())
long_running_task.apply_async(args=[task_id], task_id=task_id)
return {"task_id": task_id}

@app.websocket("/ws/tasks/{task_id}")
async def websocket_endpoint(websocket: WebSocket, task_id: str):
await websocket.accept()
active_connections[task_id] = websocket

try:
while True:
# Check task status
task = celery_app.AsyncResult(task_id)

if task.state == "PENDING":
await websocket.send_json({"status": "pending"})
elif task.state == "PROGRESS":
await websocket.send_json({
"status": "progress",
"progress": task.info.get("current", 0),
"total": task.info.get("total", 1)
})
else:
# Task completed or failed
if task.successful():
await websocket.send_json({
"status": "completed",
"result": task.result
})
else:
await websocket.send_json({
"status": "failed",
"error": str(task.result)
})
break

await asyncio.sleep(1) # Check every second

except Exception:
# Client disconnected
pass
finally:
if task_id in active_connections:
del active_connections[task_id]

This implementation:

  1. Creates a WebSocket connection for each task
  2. Periodically checks the task status
  3. Sends real-time updates about task progress
  4. Closes the connection when the task completes

Best Practices for Task Results

When implementing task results in FastAPI applications, consider these best practices:

  1. Clean Up Results: Implement a cleanup strategy to prevent accumulating old results
  2. Secure Access: Ensure only authorized users can access task results
  3. Handle Failures: Plan for task failures and provide meaningful error messages
  4. Add Timeouts: Set reasonable timeouts for long-running tasks
  5. Use Result Backends: For production, use a reliable result backend like Redis or PostgreSQL
python
# Example of secure task result access
@app.get("/tasks/{task_id}")
async def get_task_result(
task_id: str,
current_user: User = Depends(get_current_user)
):
# Get the task
task = get_task_by_id(task_id)

# Check if task belongs to user
if task.user_id != current_user.id:
raise HTTPException(
status_code=403,
detail="Not authorized to access this task"
)

# Return task result
return {
"id": task.id,
"status": task.status,
"result": task.result,
"created_at": task.created_at
}

Practical Example: Image Processing Service

Let's implement a practical example - an image processing service:

python
from fastapi import FastAPI, UploadFile, File, BackgroundTasks, Depends, HTTPException
from sqlalchemy.orm import Session
from fastapi.responses import FileResponse
import uuid
import os
from PIL import Image
import time

# Database models and setup code omitted for brevity

app = FastAPI()

class ImageProcessor:
@staticmethod
def resize_image(input_path, output_path, size=(800, 600)):
"""Resize an image and save to output path"""
img = Image.open(input_path)
img = img.resize(size, Image.LANCZOS)
img.save(output_path)
return output_path

@app.post("/images/process/")
async def process_image(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
db: Session = Depends(get_db)
):
# Generate unique ID for this task
task_id = str(uuid.uuid4())

# Save uploaded file
input_path = f"uploads/{task_id}-{file.filename}"
os.makedirs("uploads", exist_ok=True)

# Create output path
output_path = f"processed/{task_id}-{file.filename}"
os.makedirs("processed", exist_ok=True)

# Save the uploaded file
with open(input_path, "wb") as f:
content = await file.read()
f.write(content)

# Create task record
task = TaskResult(
id=task_id,
status="processing",
result={"original_filename": file.filename}
)
db.add(task)
db.commit()

# Start background task
def process_image_task():
try:
# Process the image
ImageProcessor.resize_image(input_path, output_path)

# Update task status
db_session = SessionLocal()
try:
task = db_session.query(TaskResult).filter(TaskResult.id == task_id).first()
task.status = "completed"
task.result = {
**task.result,
"processed_path": output_path,
"completed_at": time.time()
}
db_session.commit()
finally:
db_session.close()
except Exception as e:
# Handle error
db_session = SessionLocal()
try:
task = db_session.query(TaskResult).filter(TaskResult.id == task_id).first()
task.status = "failed"
task.result = {
**task.result,
"error": str(e)
}
db_session.commit()
finally:
db_session.close()

background_tasks.add_task(process_image_task)

return {"task_id": task_id, "status": "processing"}

@app.get("/images/{task_id}")
async def get_processed_image(task_id: str, db: Session = Depends(get_db)):
# Get task
task = db.query(TaskResult).filter(TaskResult.id == task_id).first()

if not task:
raise HTTPException(status_code=404, detail="Task not found")

if task.status == "processing":
return {"status": "processing", "task_id": task_id}
elif task.status == "failed":
return {"status": "failed", "error": task.result.get("error")}
else:
# Return the processed image
return FileResponse(
task.result.get("processed_path"),
media_type="image/jpeg",
filename=f"processed-{task.result.get('original_filename')}"
)

This example:

  1. Accepts an uploaded image
  2. Processes it in the background
  3. Stores the processing status and result
  4. Provides an endpoint to retrieve the processed image

Summary

Managing background task results in FastAPI applications requires thoughtful consideration of storage, retrieval, and status tracking. The approaches we've explored range from simple in-memory solutions to sophisticated Celery-based systems with real-time updates.

Key takeaways:

  • Use unique task IDs to track and retrieve results
  • Store task status and results in a database for persistence
  • Consider Celery for complex task management requirements
  • Implement WebSockets for real-time status updates
  • Follow security and cleanup best practices

By implementing these patterns, you can create robust FastAPI applications that handle long-running tasks efficiently while providing users with visibility into task progress and results.

Additional Resources

Exercises

  1. Create a simple task system that processes text (counts words, analyzes sentiment) and returns results
  2. Implement a task progress bar using WebSockets
  3. Build a task cleanup system that automatically removes old task results
  4. Add user authentication to your task system so users can only see their own tasks
  5. Create a dashboard for monitoring all active and completed tasks


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)