FastAPI Concurrent Tasks
When building web applications, you often need to perform operations that might take a significant amount of time but shouldn't block the main request processing. FastAPI provides elegant solutions for handling concurrent tasks that can run in the background while your API continues to serve other requests.
Introduction to Concurrent Tasks in FastAPI
Concurrent tasks allow your application to perform multiple operations simultaneously without blocking the main execution thread. This is especially important in web applications where responsiveness is critical. FastAPI leverages Python's asynchronous capabilities to make concurrent programming straightforward and efficient.
There are two main approaches to handling background tasks in FastAPI:
- Background tasks - for short-lived operations that should happen after a response is sent
- Concurrent tasks - for more complex operations that can run independently
In this tutorial, we'll focus on implementing true concurrent tasks using FastAPI's ecosystem.
Understanding Concurrency vs. Parallelism
Before diving into implementation, let's clarify two related concepts:
- Concurrency: Managing multiple tasks and making progress on them. Tasks may start, run, and complete in overlapping time periods.
- Parallelism: Actually executing multiple tasks simultaneously (requires multiple CPU cores).
FastAPI uses Python's async
/await
syntax for concurrency, which allows your application to handle multiple operations without being blocked by long-running tasks.
Setting Up for Concurrent Tasks
First, let's set up a basic FastAPI application with the required dependencies:
# requirements.txt
fastapi==0.95.0
uvicorn==0.21.1
Our basic application structure:
# main.py
from fastapi import FastAPI
app = FastAPI(title="Concurrent Tasks Demo")
@app.get("/")
async def root():
return {"message": "Hello World"}
Implementing Concurrent Tasks
Method 1: Using asyncio.create_task
Python's asyncio
library provides tools to create and manage concurrent tasks. Here's how to implement a simple concurrent task:
import asyncio
import time
from fastapi import FastAPI
app = FastAPI(title="Concurrent Tasks Demo")
async def process_data(data: str):
"""Simulate a time-consuming process"""
print(f"Started processing {data}")
# Simulate CPU or I/O intensive task
await asyncio.sleep(5) # Non-blocking sleep
print(f"Finished processing {data}")
return f"Processed: {data}"
@app.get("/process/{item_id}")
async def process_item(item_id: str):
# Create a task that will run concurrently
task = asyncio.create_task(process_data(f"item-{item_id}"))
# The endpoint returns immediately without waiting for the task to complete
return {"message": f"Processing of item {item_id} started in background"}
When you call the /process/{item_id}
endpoint, the response will be immediate, but the task continues to run in the background.
Method 2: Using External Task Queues with Celery
For more robust concurrent task processing, especially for tasks that need to survive application restarts, we can use Celery with message brokers like Redis or RabbitMQ:
First, install the required dependencies:
# requirements.txt additions
celery==5.2.7
redis==4.5.4
Now, let's implement a Celery-based task system:
# tasks.py
from celery import Celery
# Configure Celery with Redis as the broker
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
@celery_app.task
def process_data_task(data: str):
"""A long-running task that will be executed by Celery workers"""
import time
print(f"Started processing {data}")
time.sleep(5) # Simulate work (using real sleep, not asyncio.sleep)
print(f"Finished processing {data}")
return f"Processed: {data}"
Integrating with our FastAPI application:
# main.py with Celery integration
from fastapi import FastAPI
from tasks import process_data_task
app = FastAPI(title="Concurrent Tasks Demo")
@app.get("/process-celery/{item_id}")
async def process_with_celery(item_id: str):
# Schedule a task with Celery
task = process_data_task.delay(f"item-{item_id}")
# Return task ID that can be used for status checks
return {"message": f"Processing started", "task_id": task.id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
# Check the status of a Celery task
task = process_data_task.AsyncResult(task_id)
if task.ready():
return {"status": "completed", "result": task.get()}
return {"status": "processing"}
To run this system, you'll need:
- A Redis server running (or another message broker)
- A Celery worker process:
celery -A tasks worker --loglevel=info
- Your FastAPI application:
uvicorn main:app --reload
Managing Concurrent Task States
When working with concurrent tasks, managing their state becomes important:
import asyncio
import uuid
from fastapi import FastAPI, HTTPException
app = FastAPI(title="Task State Management Demo")
# Dictionary to store task statuses
task_store = {}
async def long_running_task(task_id: str):
"""Simulate a long running task with multiple states"""
task_store[task_id] = {"status": "running", "progress": 0}
for i in range(1, 11):
await asyncio.sleep(1) # Simulate work being done
task_store[task_id] = {"status": "running", "progress": i * 10}
task_store[task_id] = {"status": "completed", "progress": 100, "result": "Task finished successfully"}
@app.post("/start-task/", status_code=202)
async def start_task():
"""Start a new background task and return a task ID"""
task_id = str(uuid.uuid4())
# Start the task without awaiting completion
asyncio.create_task(long_running_task(task_id))
return {"task_id": task_id, "status": "started"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""Check the status of a task"""
if task_id not in task_store:
raise HTTPException(status_code=404, detail="Task not found")
return task_store[task_id]
This approach lets clients start a task and then poll for its status until completion.
Error Handling in Concurrent Tasks
Error handling is crucial when working with concurrent tasks. Here's how to implement robust error handling:
import asyncio
import logging
from fastapi import FastAPI
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def risky_task(data: str):
"""A task that might fail"""
try:
logger.info(f"Processing {data}")
if "error" in data:
# Simulate an error condition
raise ValueError(f"Error processing {data}")
await asyncio.sleep(3)
return f"Success: {data}"
except Exception as e:
logger.error(f"Task failed: {str(e)}")
# Could also store this error in a database or notify admins
raise
@app.get("/safe-task/{item}")
async def execute_safe_task(item: str):
"""Execute a task with error handling"""
try:
# Create and wrap the task in a try-except block
task = asyncio.create_task(risky_task(item))
# We're not awaiting the task, so we're setting up a way to handle future errors
# This function will be called when the task completes or fails
def handle_task_result(future):
try:
# This retrieves the result or raises the exception
future.result()
except Exception as e:
logger.error(f"Background task failed: {str(e)}")
# Add the callback to handle the result
task.add_done_callback(handle_task_result)
return {"message": "Task started"}
except Exception as e:
return {"error": str(e)}
Real-World Example: Image Processing Service
Let's build a more practical example - an image processing service that handles image resizing in the background:
import asyncio
import base64
import io
import uuid
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from PIL import Image # You'll need to pip install Pillow
app = FastAPI()
# Store for keeping track of image processing tasks
image_tasks = {}
async def resize_image(image_data: bytes, task_id: str):
"""Resize an image asynchronously"""
try:
image_tasks[task_id] = {"status": "processing"}
# Simulate delay in processing
await asyncio.sleep(3)
# Process the image (resize to thumbnail)
img = Image.open(io.BytesIO(image_data))
img.thumbnail((100, 100))
# Save the result
output = io.BytesIO()
img.save(output, format="JPEG")
output.seek(0)
# Store the result as base64
result = base64.b64encode(output.getvalue()).decode('utf-8')
image_tasks[task_id] = {
"status": "completed",
"thumbnail": result
}
except Exception as e:
image_tasks[task_id] = {
"status": "error",
"error": str(e)
}
@app.post("/resize-image/")
async def create_resize_task(image: UploadFile = File(...)):
"""Start an image resizing task"""
task_id = str(uuid.uuid4())
contents = await image.read()
# Start the resizing task
asyncio.create_task(resize_image(contents, task_id))
return JSONResponse(
status_code=202,
content={
"task_id": task_id,
"status": "processing",
"message": "Image resize task started"
}
)
@app.get("/resize-status/{task_id}")
async def get_resize_status(task_id: str):
"""Check the status of an image resizing task"""
if task_id not in image_tasks:
return JSONResponse(
status_code=404,
content={"error": "Task not found"}
)
return image_tasks[task_id]
Best Practices for Concurrent Tasks
-
Keep tasks small and focused - Large, complex tasks are harder to manage and debug
-
Use proper timeouts - Always implement timeouts to prevent tasks from running indefinitely:
async def task_with_timeout(data):
try:
# The task will be cancelled if it takes more than 30 seconds
return await asyncio.wait_for(some_long_operation(data), timeout=30.0)
except asyncio.TimeoutError:
logger.error("Task timed out")
return {"error": "The operation timed out"}
- Limit concurrency - Avoid starting too many concurrent tasks as it can exhaust system resources:
# Create a semaphore to limit concurrent tasks
semaphore = asyncio.Semaphore(10) # Maximum 10 concurrent tasks
async def limited_task(data):
async with semaphore:
# Only 10 tasks will be running these lines at once
return await some_long_operation(data)
-
Monitor and log - Always implement proper logging to track the execution of your concurrent tasks
-
Consider task persistence - For critical tasks, use a system that can survive application restarts (like Celery)
Summary
In this tutorial, we've explored how to effectively implement concurrent tasks in FastAPI:
- Using
asyncio.create_task
for simple in-process tasks - Implementing Celery for distributed task processing
- Managing task states and handling errors
- Building a practical image processing service
- Following best practices for robust concurrent task handling
Concurrent tasks in FastAPI allow you to build responsive, efficient APIs that can handle computationally or I/O intensive operations without blocking the main request handling flow.
Additional Resources
Exercises
- Build a FastAPI application that processes CSV files in the background and extracts statistics
- Implement a concurrent email sending service with status tracking
- Create a system that periodically fetches data from an external API and updates a local database
- Extend the image processing service to support multiple operations (resize, crop, filter) with a task queue
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)