FastAPI Task Scheduling
Introduction
In modern web applications, executing tasks at specific times or intervals is a common requirement. Think of sending daily email digests, cleaning up database records weekly, or fetching external data every hour. This is where task scheduling comes in.
FastAPI doesn't provide built-in scheduling capabilities, but we can easily integrate third-party libraries to add this functionality. In this tutorial, we'll learn how to implement task scheduling in FastAPI applications using the popular APScheduler
library.
By the end of this lesson, you'll be able to:
- Set up scheduled tasks in your FastAPI application
- Configure different types of schedules (interval, cron, one-time)
- Manage scheduled tasks dynamically
Prerequisites
Before getting started, make sure you have:
- Basic FastAPI knowledge
- Python 3.7+ installed
- A FastAPI project set up
Setting Up APScheduler with FastAPI
First, let's install the required packages:
pip install fastapi apscheduler
Now, let's create a basic FastAPI application with scheduler integration:
from fastapi import FastAPI, BackgroundTasks
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
app = FastAPI(title="Task Scheduling Demo")
# Create a scheduler instance
scheduler = BackgroundScheduler()
# Start the scheduler
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
@app.get("/")
def read_root():
return {"message": "Task scheduling service is running"}
In the code above, we:
- Import the necessary modules
- Create a FastAPI application
- Initialize a
BackgroundScheduler
for our scheduled tasks - Register a shutdown event to properly close the scheduler when our app terminates
Types of Scheduling in APScheduler
APScheduler supports three main types of schedules:
- Interval-based scheduling: Run a task every X seconds/minutes/hours
- Cron-based scheduling: Run a task at specific times using cron expressions
- One-time scheduling: Run a task only once at a specific time
Let's see examples of each:
1. Interval-based Scheduling
Let's add a job that logs a message every 30 seconds:
def log_current_time():
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Schedule the job to run every 30 seconds
scheduler.add_job(log_current_time, 'interval', seconds=30, id='log_time_job')
2. Cron-based Scheduling
Now let's create a task that runs at a specific time using a cron expression:
def daily_summary_report():
print(f"Generating daily summary report at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Add your reporting logic here
# Run every day at 8:00 AM
scheduler.add_job(
daily_summary_report,
'cron',
hour=8,
minute=0,
id='daily_summary_job'
)
3. One-time Scheduling
For tasks that should only run once at a specific time:
from datetime import datetime, timedelta
def one_time_task():
print(f"Executing one-time task at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Schedule a job to run once, 2 minutes from now
run_time = datetime.now() + timedelta(minutes=2)
scheduler.add_job(
one_time_task,
'date',
run_date=run_time,
id='one_time_job'
)
Creating API Endpoints to Manage Scheduled Tasks
Let's create some API endpoints to dynamically add, remove, and list scheduled tasks:
from pydantic import BaseModel
from typing import Optional
class IntervalTaskModel(BaseModel):
job_id: str
seconds: Optional[int] = None
minutes: Optional[int] = None
hours: Optional[int] = None
task_description: str
@app.post("/schedule/interval")
def create_interval_task(task: IntervalTaskModel):
def scheduled_task():
print(f"Executing task: {task.task_description}")
# Add your actual task logic here
try:
# Add the job to the scheduler
scheduler.add_job(
scheduled_task,
'interval',
seconds=task.seconds,
minutes=task.minutes,
hours=task.hours,
id=task.job_id
)
return {"status": "success", "message": f"Task {task.job_id} scheduled successfully"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.delete("/schedule/{job_id}")
def remove_scheduled_task(job_id: str):
try:
scheduler.remove_job(job_id)
return {"status": "success", "message": f"Task {job_id} removed successfully"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.get("/schedule")
def get_scheduled_tasks():
jobs = []
for job in scheduler.get_jobs():
jobs.append({
"id": job.id,
"next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job.next_run_time else None,
"type": str(job.trigger)
})
return {"jobs": jobs}
This provides a simple API to:
- Create new interval-based scheduled tasks
- Remove existing tasks by ID
- List all scheduled tasks
Practical Example: Database Cleanup with Scheduled Tasks
Let's implement a practical example that cleans up old records from a database at regular intervals:
from fastapi import Depends
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
# Import your database models and session dependency
# from database import get_db, ExpiredRecords
def cleanup_old_records(db: Session = next(get_db())):
try:
# Define the cutoff date (e.g., 30 days ago)
cutoff_date = datetime.now() - timedelta(days=30)
# Delete records older than the cutoff date
deleted_count = db.query(ExpiredRecords).filter(
ExpiredRecords.created_at < cutoff_date
).delete()
db.commit()
print(f"Database cleanup completed. Deleted {deleted_count} old records.")
except Exception as e:
db.rollback()
print(f"Error during database cleanup: {str(e)}")
# Schedule the cleanup to run every day at midnight
scheduler.add_job(
cleanup_old_records,
'cron',
hour=0,
minute=0,
id='daily_db_cleanup'
)
Advanced: Working with Asynchronous Tasks
APScheduler can also work with asynchronous functions. Here's how you can schedule async tasks:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import aiohttp
import asyncio
# Create an async scheduler (must use AsyncIOScheduler instead of BackgroundScheduler)
async_scheduler = AsyncIOScheduler()
async_scheduler.start()
async def fetch_external_api_data():
print(f"Fetching data from external API at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
data = await response.json()
print(f"Received {len(data)} records")
# Process the data here
# Schedule the async job to run every hour
async_scheduler.add_job(
fetch_external_api_data,
'interval',
hours=1,
id='hourly_api_fetch'
)
@app.on_event("shutdown")
async def shutdown_async_scheduler():
async_scheduler.shutdown()
Best Practices and Considerations
When using task scheduling in FastAPI, keep these best practices in mind:
- Error Handling: Always wrap your scheduled task in try-except blocks to prevent scheduler crashes
- Scheduler Persistence: For production, consider using a persistent job store (e.g., SQLAlchemy)
- Resource Management: Be mindful of resource consumption for frequently scheduled tasks
- Logging: Add proper logging to track the execution of scheduled tasks
- Task Duration: Avoid scheduling short-interval tasks that take longer to execute than the interval itself
Here's an example with improved error handling and logging:
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def robust_scheduled_task():
try:
logger.info("Starting scheduled task")
# Your task logic here
logger.info("Scheduled task completed successfully")
except Exception as e:
logger.error(f"Error in scheduled task: {str(e)}", exc_info=True)
Using Job Stores for Persistence
For production environments, it's recommended to use a persistent job store to ensure your jobs survive application restarts:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# Define job stores and executors
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# Create scheduler with persistence
persistent_scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults
)
persistent_scheduler.start()
Summary
In this tutorial, we've learned how to implement task scheduling in FastAPI applications using APScheduler. We covered:
- Setting up a scheduler with FastAPI
- Different types of scheduling (interval, cron, one-time)
- Creating API endpoints to manage scheduled tasks
- Implementing practical examples like database cleanup
- Working with asynchronous scheduled tasks
- Best practices for production use
With task scheduling, you can automate recurring operations in your FastAPI applications, making them more efficient and capable of handling time-based workflows.
Additional Resources and Exercises
Resources
Exercises
-
Email Digest Service: Create a scheduled task that sends a daily digest email to users containing summaries of their activity.
-
Cache Invalidation: Implement a scheduled task that clears expired items from a cache every few minutes.
-
Health Check Monitor: Build a scheduler that performs health checks on external services and logs their status at regular intervals.
-
Dynamic Scheduling: Create an API endpoint that allows users to schedule their own tasks with custom intervals and provide notifications when those tasks complete.
-
Scheduler Dashboard: Build a simple web interface that displays all scheduled tasks, their next run times, and allows administrators to pause or resume tasks.
By mastering task scheduling in FastAPI, you'll be able to create more sophisticated applications that can perform time-based operations autonomously.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)