FastAPI Security Best Practices
Introduction
Security is a critical aspect of any web application, and APIs are no exception. FastAPI provides several built-in features to help you secure your applications, but knowing how to implement them effectively is essential. This guide explores the best practices for securing your FastAPI applications against common vulnerabilities and attacks.
As APIs often serve as gateways to sensitive data and operations, implementing proper security measures is not optional—it's necessary. Whether you're building a small personal project or an enterprise application, these security best practices will help you create more robust and secure FastAPI applications.
Authentication and Authorization
1. Use OAuth2 with Password and Bearer Tokens
FastAPI provides built-in support for OAuth2, which is a widely adopted protocol for authentication and authorization.
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: str
full_name: str
disabled: bool = False
def fake_decode_token(token):
# In a real app, you would decode the token
return User(
username="johndoe",
email="[email protected]",
full_name="John Doe"
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# In a real app, verify credentials against a database
if form_data.username != "johndoe" or form_data.password != "secret":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return {"access_token": "fake-token", "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
In this example:
- We create an OAuth2 password bearer scheme
- Users authenticate using username/password at the
/token
endpoint - The token is then used to access protected endpoints
2. Implement JWT for Stateless Authentication
JSON Web Tokens (JWT) provide a secure, stateless way to handle user authentication:
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
# Secret key and algorithm
SECRET_KEY = "your-secret-key" # In production, use a secure random key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# In a real app, get the user from a database
user = get_user(username=username)
if user is None:
raise credentials_exception
return user
3. Role-Based Access Control (RBAC)
Implement role-based permissions to limit access to specific endpoints:
from enum import Enum
from typing import List
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class UserInDB(User):
password: str
roles: List[Role]
def get_user_permissions(user: UserInDB = Depends(get_current_user)):
return user.roles
def check_admin_permission(roles: List[Role] = Depends(get_user_permissions)):
if Role.ADMIN not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return True
@app.get("/admin-only")
async def admin_only(is_admin: bool = Depends(check_admin_permission)):
return {"message": "You have admin access"}
Protection Against Common Attacks
1. Cross-Site Scripting (XSS) Protection
FastAPI uses Starlette's built-in protection mechanisms, but you should still be careful with user input:
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from markupsafe import escape
app = FastAPI()
templates = Jinja2Templates(directory="templates")
@app.get("/items/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
# Escape user input to prevent XSS
safe_id = escape(id)
return templates.TemplateResponse("item.html", {"request": request, "id": safe_id})
2. SQL Injection Prevention
Always use parameterized queries or an ORM like SQLAlchemy:
# Unsafe way (DO NOT USE)
# query = f"SELECT * FROM users WHERE username = '{username}'"
# Safe way with SQLAlchemy
from sqlalchemy.orm import Session
def get_user(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
3. Rate Limiting
Implement rate limiting to prevent brute force attacks:
from fastapi import FastAPI, Depends, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request
# Set up rate limiter
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/limited-endpoint")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}
Secure Configuration
1. Environment Variables for Sensitive Data
Never hardcode sensitive information. Use environment variables:
import os
from fastapi import FastAPI
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Get sensitive data from environment
DATABASE_URL = os.getenv("DATABASE_URL")
SECRET_KEY = os.getenv("SECRET_KEY")
2. CORS Configuration
Configure CORS (Cross-Origin Resource Sharing) properly:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://trusted-domain.com"], # Specify trusted domains
allow_credentials=True,
allow_methods=["*"], # Or specify HTTP methods: ["GET", "POST"]
allow_headers=["*"], # Or specify allowed headers
)
3. HTTP Security Headers
Add security headers to your responses:
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
app = FastAPI()
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response
app.add_middleware(SecurityHeadersMiddleware)
Dependency Scanning and Updates
1. Regular Dependency Updates
Keep your dependencies up-to-date to avoid known security vulnerabilities:
# Check for outdated dependencies
pip list --outdated
# Update dependencies
pip install --upgrade package-name
# Use tools like safety to check for vulnerabilities
pip install safety
safety check
2. Use a Virtual Environment
Always use a virtual environment to isolate your project dependencies:
# Create a virtual environment
python -m venv venv
# Activate the virtual environment
# On Windows:
venv\Scripts\activate
# On Unix or MacOS:
source venv/bin/activate
Data Validation
FastAPI uses Pydantic for data validation, which helps prevent many security issues:
from pydantic import BaseModel, EmailStr, validator
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
# Add custom validators
@validator('password')
def password_must_be_strong(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v
@app.post("/users/")
async def create_user(user: UserCreate):
# The input is already validated by Pydantic
return {"username": user.username, "email": user.email}
Logging and Monitoring
1. Structured Logging
Implement structured logging for better security monitoring:
import logging
from fastapi import FastAPI, Request
import time
import uuid
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
app = FastAPI()
@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.time()
# Log request details
logger.info(f"Request started | ID: {request_id} | Method: {request.method} | Path: {request.url.path}")
response = await call_next(request)
# Log response details
process_time = time.time() - start_time
logger.info(f"Request completed | ID: {request_id} | Status: {response.status_code} | Time: {process_time:.5f}s")
return response
2. Security Monitoring
Implement monitoring for suspicious activities:
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi.security import APIKeyHeader
import logging
app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
logger = logging.getLogger(__name__)
@app.middleware("http")
async def monitor_suspicious_activity(request: Request, call_next):
# Check for suspicious patterns in request
if "admin" in request.url.path and request.client.host not in ["trusted-ip-1", "trusted-ip-2"]:
logger.warning(f"Suspicious admin access attempt from IP: {request.client.host}")
response = await call_next(request)
# Monitor for potential data leakage in responses
if response.status_code == 200 and "sensitive" in request.url.path:
logger.info(f"Sensitive data accessed by IP: {request.client.host}")
return response
Real-world Application: Secure API with Complete Authentication
Here's a more complete example of a secure FastAPI application with JWT authentication:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
# Security configuration
SECRET_KEY = "SECRET_KEY_CHANGE_IN_PRODUCTION" # In production, use os.getenv()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
app = FastAPI()
# Models
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
# Mock database
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": pwd_context.hash("secret"),
"disabled": False,
}
}
# Authentication utilities
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Routes
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/secure-data")
async def read_secure_data(current_user: User = Depends(get_current_active_user)):
return {"message": "This is secure data!", "owner": current_user.username}
Summary
In this guide, we've covered essential FastAPI security best practices:
-
Authentication and Authorization
- Using OAuth2 with JWT tokens
- Implementing role-based access control
-
Protection Against Common Attacks
- XSS protection
- SQL injection prevention
- Rate limiting
-
Secure Configuration
- Environment variables for sensitive data
- CORS configuration
- Security headers
-
Dependency Management
- Regular updates
- Virtual environments
-
Data Validation
- Using Pydantic models
- Custom validators
-
Logging and Monitoring
- Structured logging
- Security monitoring
By implementing these security best practices, you'll significantly improve the security posture of your FastAPI applications and protect them against common vulnerabilities.
Additional Resources
- FastAPI Security Documentation
- OWASP API Security Top 10
- JWT.io - Debugger and information about JSON Web Tokens
- Python Security Best Practices
Practice Exercises
-
Basic Authentication System: Implement a FastAPI application with basic username/password authentication.
-
JWT Authentication: Extend the basic authentication to use JWT tokens with proper expiration handling.
-
Role-Based Access Control: Create a system with different user roles and permission levels.
-
Rate Limiting: Implement rate limiting on critical endpoints to prevent abuse.
-
Security Headers Middleware: Create a middleware that adds all necessary security headers to responses.
-
Password Reset Flow: Design a secure password reset flow with temporary tokens and email verification.
Remember, security is not a one-time implementation but an ongoing process. Regularly review and update your security practices to protect against emerging threats.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)