FastAPI Task Monitoring
In production applications, executing tasks in the background is only half the battle. Knowing the status of those tasks, troubleshooting failures, and understanding performance metrics is equally important. In this tutorial, we'll explore how to implement effective monitoring for your FastAPI background tasks.
Why Monitor Background Tasks?
Background tasks operate asynchronously, detached from the main request flow. This separation provides several advantages but makes it challenging to know:
- Has the task started?
- Is it still running?
- Did it complete successfully?
- How long did it take?
- What went wrong if it failed?
Proper monitoring answers these questions and helps maintain reliable services.
Basic Logging for Background Tasks
Let's start with a simple approach - implementing logging in your background tasks.
import logging
import time
from fastapi import FastAPI, BackgroundTasks
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI()
def process_data(item_id: int):
logger.info(f"Starting background task for item {item_id}")
try:
# Simulate processing time
time.sleep(5)
logger.info(f"Background task for item {item_id} completed successfully")
except Exception as e:
logger.error(f"Background task for item {item_id} failed: {str(e)}")
raise
@app.post("/items/{item_id}/process")
async def process_item(item_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(process_data, item_id)
return {"message": "Item processing started"}
This basic approach provides visibility through logs but lacks structured tracking for multiple tasks.
Creating a Task Registry
To get better insights, let's implement a simple task registry to track task status:
import time
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class TaskInfo(BaseModel):
id: str
status: TaskStatus
created_at: datetime
updated_at: datetime
result: Any = None
error: str = None
# Our in-memory task registry
task_registry: Dict[str, TaskInfo] = {}
def run_task(task_id: str, seconds: int):
# Update status to running
update_task_status(task_id, TaskStatus.RUNNING)
try:
# Simulate work
time.sleep(seconds)
# Update task with results
update_task_status(
task_id,
TaskStatus.COMPLETED,
result=f"Task completed after {seconds} seconds"
)
except Exception as e:
# Update task with error
update_task_status(
task_id,
TaskStatus.FAILED,
error=str(e)
)
def update_task_status(task_id: str, status: TaskStatus, result=None, error=None):
task = task_registry.get(task_id)
if task:
task.status = status
task.updated_at = datetime.now()
if result is not None:
task.result = result
if error is not None:
task.error = error
@app.post("/tasks/", response_model=TaskInfo)
async def create_task(background_tasks: BackgroundTasks, seconds: int = 10):
# Create a unique ID for this task
task_id = str(uuid.uuid4())
# Register the task
task_info = TaskInfo(
id=task_id,
status=TaskStatus.PENDING,
created_at=datetime.now(),
updated_at=datetime.now()
)
task_registry[task_id] = task_info
# Start the background task
background_tasks.add_task(run_task, task_id, seconds)
return task_info
@app.get("/tasks/{task_id}", response_model=TaskInfo)
async def get_task(task_id: str):
if task_id not in task_registry:
raise HTTPException(status_code=404, detail="Task not found")
return task_registry[task_id]
@app.get("/tasks/")
async def list_tasks():
return list(task_registry.values())
With this implementation:
- Each task gets a unique ID
- Tasks progress through defined states (pending, running, completed, failed)
- We track creation and update times
- We capture results or error messages
- We can query task status via API endpoints
Adding Progress Tracking
For long-running tasks, tracking progress percentage can be valuable:
class TaskInfo(BaseModel):
id: str
status: TaskStatus
created_at: datetime
updated_at: datetime
progress: float = 0.0 # Progress from 0 to 100
result: Any = None
error: str = None
def process_large_data(task_id: str, items: int):
update_task_status(task_id, TaskStatus.RUNNING)
try:
for i in range(items):
# Do some work
time.sleep(0.5)
# Update progress
progress = (i + 1) / items * 100
update_task_status(
task_id,
TaskStatus.RUNNING,
progress=progress
)
update_task_status(
task_id,
TaskStatus.COMPLETED,
progress=100.0,
result=f"Processed {items} items successfully"
)
except Exception as e:
update_task_status(
task_id,
TaskStatus.FAILED,
error=str(e)
)
Implementing a More Robust Solution
For production use, consider these enhancements:
1. Using Redis for Task Storage
In-memory storage isn't persistent. Redis provides a better solution for task tracking:
import redis
import json
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def store_task_info(task_id: str, task_info: dict):
redis_client.set(f"task:{task_id}", json.dumps(task_info))
# Set expiration time (e.g., keep task info for 24 hours)
redis_client.expire(f"task:{task_id}", 86400)
def get_task_info(task_id: str):
data = redis_client.get(f"task:{task_id}")
if data:
return json.loads(data)
return None
def run_background_task(task_id: str):
# Update task to running
task_info = get_task_info(task_id)
task_info["status"] = "running"
task_info["updated_at"] = str(datetime.now())
store_task_info(task_id, task_info)
try:
# Do work...
time.sleep(10)
# Update to completed
task_info = get_task_info(task_id)
task_info["status"] = "completed"
task_info["updated_at"] = str(datetime.now())
task_info["result"] = "Task completed successfully"
store_task_info(task_id, task_info)
except Exception as e:
# Update with error
task_info = get_task_info(task_id)
task_info["status"] = "failed"
task_info["updated_at"] = str(datetime.now())
task_info["error"] = str(e)
store_task_info(task_id, task_info)
2. Using Celery for Advanced Monitoring
For even more advanced monitoring, consider integrating Celery with FastAPI:
from fastapi import FastAPI
from celery import Celery
from celery.result import AsyncResult
# Configure Celery
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
app = FastAPI()
@celery_app.task
def process_data(data):
# Long-running process here
return {"result": "Data processed successfully"}
@app.post("/process/")
async def start_process(data: dict):
# Start a Celery task
task = process_data.delay(data)
return {"task_id": task.id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
# Get task status from Celery
task_result = AsyncResult(task_id, app=celery_app)
# Return task status information
result = {
"task_id": task_id,
"status": task_result.status
}
# Include result or error if available
if task_result.ready():
if task_result.successful():
result["result"] = task_result.get()
else:
# If the task failed, get the exception info
result["error"] = str(task_result.result)
return result
Celery provides a comprehensive task monitoring system including:
- Task status tracking
- Result storage
- Automatic retries
- Error handling
- Task inspection and statistics
Implementing a WebSocket for Real-time Monitoring
For real-time monitoring, we can use WebSockets to push task updates to clients:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, task_id: str):
await websocket.accept()
if task_id not in self.active_connections:
self.active_connections[task_id] = []
self.active_connections[task_id].append(websocket)
def disconnect(self, websocket: WebSocket, task_id: str):
if task_id in self.active_connections:
self.active_connections[task_id].remove(websocket)
async def send_task_update(self, task_id: str, data: dict):
if task_id in self.active_connections:
for connection in self.active_connections[task_id]:
await connection.send_json(data)
manager = ConnectionManager()
# When task status changes, notify connected clients
def update_task_status(task_id: str, status: TaskStatus, **kwargs):
task = task_registry.get(task_id)
if task:
task.status = status
task.updated_at = datetime.now()
# Update other fields
for key, value in kwargs.items():
if hasattr(task, key):
setattr(task, key, value)
# Notify clients about the update
asyncio.create_task(manager.send_task_update(
task_id,
task.dict()
))
@app.websocket("/ws/tasks/{task_id}")
async def websocket_task_endpoint(websocket: WebSocket, task_id: str):
await manager.connect(websocket, task_id)
try:
# Send initial task status
if task_id in task_registry:
await websocket.send_json(task_registry[task_id].dict())
# Keep connection alive
while True:
# Wait for any message from client to keep connection alive
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(websocket, task_id)
Best Practices for Task Monitoring
-
Use unique task IDs: Each task should have a unique identifier for tracking.
-
Include timestamps: Track when tasks are created, started, and completed.
-
Implement structured logging: Use a consistent format for easy parsing.
-
Add progress reporting: For long-running tasks, report percentage completion.
-
Store task history: Keep a record of past tasks for analysis.
-
Set up alerts: Get notified when tasks fail or take too long.
-
Implement timeout handling: Prevent tasks from running indefinitely.
-
Add resource usage monitoring: Track memory and CPU usage.
-
Create a monitoring dashboard: Visualize task performance and status.
-
Clean up completed tasks: Implement a retention policy to clear old task data.
Summary
Monitoring background tasks is essential for maintaining robust FastAPI applications. In this tutorial, we've explored:
- Basic logging for background tasks
- Creating a task registry for status tracking
- Implementing progress reporting
- Using Redis for persistent task storage
- Leveraging Celery for advanced monitoring
- Setting up WebSockets for real-time monitoring
By implementing proper monitoring, you'll gain visibility into your background processes, enabling faster debugging and more reliable applications.
Further Resources
- FastAPI official documentation on background tasks
- Celery documentation
- Redis documentation
- Prometheus for metrics collection
- Grafana for visualization
Exercises
- Enhance the task registry to include execution time metrics.
- Create a simple web dashboard that displays active tasks and their status.
- Implement a notification system that alerts when tasks fail.
- Add the ability to cancel running background tasks.
- Integrate with Prometheus to collect metrics on task performance.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)