Skip to main content

FastAPI Security Best Practices

Introduction

Security is a critical aspect of any web application, and APIs are no exception. FastAPI provides several built-in features to help you secure your applications, but knowing how to implement them effectively is essential. This guide explores the best practices for securing your FastAPI applications against common vulnerabilities and attacks.

As APIs often serve as gateways to sensitive data and operations, implementing proper security measures is not optional—it's necessary. Whether you're building a small personal project or an enterprise application, these security best practices will help you create more robust and secure FastAPI applications.

Authentication and Authorization

1. Use OAuth2 with Password and Bearer Tokens

FastAPI provides built-in support for OAuth2, which is a widely adopted protocol for authentication and authorization.

python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
username: str
email: str
full_name: str
disabled: bool = False

def fake_decode_token(token):
# In a real app, you would decode the token
return User(
username="johndoe",
email="[email protected]",
full_name="John Doe"
)

async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
return user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# In a real app, verify credentials against a database
if form_data.username != "johndoe" or form_data.password != "secret":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
return {"access_token": "fake-token", "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user

In this example:

  • We create an OAuth2 password bearer scheme
  • Users authenticate using username/password at the /token endpoint
  • The token is then used to access protected endpoints

2. Implement JWT for Stateless Authentication

JSON Web Tokens (JWT) provide a secure, stateless way to handle user authentication:

python
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext

# Secret key and algorithm
SECRET_KEY = "your-secret-key" # In production, use a secure random key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# In a real app, get the user from a database
user = get_user(username=username)
if user is None:
raise credentials_exception
return user

3. Role-Based Access Control (RBAC)

Implement role-based permissions to limit access to specific endpoints:

python
from enum import Enum
from typing import List

class Role(str, Enum):
ADMIN = "admin"
USER = "user"
GUEST = "guest"

class UserInDB(User):
password: str
roles: List[Role]

def get_user_permissions(user: UserInDB = Depends(get_current_user)):
return user.roles

def check_admin_permission(roles: List[Role] = Depends(get_user_permissions)):
if Role.ADMIN not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return True

@app.get("/admin-only")
async def admin_only(is_admin: bool = Depends(check_admin_permission)):
return {"message": "You have admin access"}

Protection Against Common Attacks

1. Cross-Site Scripting (XSS) Protection

FastAPI uses Starlette's built-in protection mechanisms, but you should still be careful with user input:

python
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from markupsafe import escape

app = FastAPI()
templates = Jinja2Templates(directory="templates")

@app.get("/items/{id}", response_class=HTMLResponse)
async def read_item(request: Request, id: str):
# Escape user input to prevent XSS
safe_id = escape(id)
return templates.TemplateResponse("item.html", {"request": request, "id": safe_id})

2. SQL Injection Prevention

Always use parameterized queries or an ORM like SQLAlchemy:

python
# Unsafe way (DO NOT USE)
# query = f"SELECT * FROM users WHERE username = '{username}'"

# Safe way with SQLAlchemy
from sqlalchemy.orm import Session

def get_user(db: Session, username: str):
return db.query(User).filter(User.username == username).first()

3. Rate Limiting

Implement rate limiting to prevent brute force attacks:

python
from fastapi import FastAPI, Depends, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.requests import Request

# Set up rate limiter
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/limited-endpoint")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "This endpoint is rate limited"}

Secure Configuration

1. Environment Variables for Sensitive Data

Never hardcode sensitive information. Use environment variables:

python
import os
from fastapi import FastAPI
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Get sensitive data from environment
DATABASE_URL = os.getenv("DATABASE_URL")
SECRET_KEY = os.getenv("SECRET_KEY")

2. CORS Configuration

Configure CORS (Cross-Origin Resource Sharing) properly:

python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["https://trusted-domain.com"], # Specify trusted domains
allow_credentials=True,
allow_methods=["*"], # Or specify HTTP methods: ["GET", "POST"]
allow_headers=["*"], # Or specify allowed headers
)

3. HTTP Security Headers

Add security headers to your responses:

python
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response

app = FastAPI()

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
return response

app.add_middleware(SecurityHeadersMiddleware)

Dependency Scanning and Updates

1. Regular Dependency Updates

Keep your dependencies up-to-date to avoid known security vulnerabilities:

bash
# Check for outdated dependencies
pip list --outdated

# Update dependencies
pip install --upgrade package-name

# Use tools like safety to check for vulnerabilities
pip install safety
safety check

2. Use a Virtual Environment

Always use a virtual environment to isolate your project dependencies:

bash
# Create a virtual environment
python -m venv venv

# Activate the virtual environment
# On Windows:
venv\Scripts\activate
# On Unix or MacOS:
source venv/bin/activate

Data Validation

FastAPI uses Pydantic for data validation, which helps prevent many security issues:

python
from pydantic import BaseModel, EmailStr, validator

class UserCreate(BaseModel):
username: str
email: EmailStr
password: str

# Add custom validators
@validator('password')
def password_must_be_strong(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(char.isdigit() for char in v):
raise ValueError('Password must contain at least one digit')
if not any(char.isupper() for char in v):
raise ValueError('Password must contain at least one uppercase letter')
return v

@app.post("/users/")
async def create_user(user: UserCreate):
# The input is already validated by Pydantic
return {"username": user.username, "email": user.email}

Logging and Monitoring

1. Structured Logging

Implement structured logging for better security monitoring:

python
import logging
from fastapi import FastAPI, Request
import time
import uuid

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)

app = FastAPI()

@app.middleware("http")
async def log_requests(request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.time()

# Log request details
logger.info(f"Request started | ID: {request_id} | Method: {request.method} | Path: {request.url.path}")

response = await call_next(request)

# Log response details
process_time = time.time() - start_time
logger.info(f"Request completed | ID: {request_id} | Status: {response.status_code} | Time: {process_time:.5f}s")

return response

2. Security Monitoring

Implement monitoring for suspicious activities:

python
from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi.security import APIKeyHeader
import logging

app = FastAPI()
api_key_header = APIKeyHeader(name="X-API-Key")
logger = logging.getLogger(__name__)

@app.middleware("http")
async def monitor_suspicious_activity(request: Request, call_next):
# Check for suspicious patterns in request
if "admin" in request.url.path and request.client.host not in ["trusted-ip-1", "trusted-ip-2"]:
logger.warning(f"Suspicious admin access attempt from IP: {request.client.host}")

response = await call_next(request)

# Monitor for potential data leakage in responses
if response.status_code == 200 and "sensitive" in request.url.path:
logger.info(f"Sensitive data accessed by IP: {request.client.host}")

return response

Real-world Application: Secure API with Complete Authentication

Here's a more complete example of a secure FastAPI application with JWT authentication:

python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from typing import Optional, List
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext

# Security configuration
SECRET_KEY = "SECRET_KEY_CHANGE_IN_PRODUCTION" # In production, use os.getenv()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

app = FastAPI()

# Models
class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: Optional[str] = None

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None

class UserInDB(User):
hashed_password: str

# Mock database
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": pwd_context.hash("secret"),
"disabled": False,
}
}

# Authentication utilities
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

# Routes
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user

@app.get("/secure-data")
async def read_secure_data(current_user: User = Depends(get_current_active_user)):
return {"message": "This is secure data!", "owner": current_user.username}

Summary

In this guide, we've covered essential FastAPI security best practices:

  1. Authentication and Authorization

    • Using OAuth2 with JWT tokens
    • Implementing role-based access control
  2. Protection Against Common Attacks

    • XSS protection
    • SQL injection prevention
    • Rate limiting
  3. Secure Configuration

    • Environment variables for sensitive data
    • CORS configuration
    • Security headers
  4. Dependency Management

    • Regular updates
    • Virtual environments
  5. Data Validation

    • Using Pydantic models
    • Custom validators
  6. Logging and Monitoring

    • Structured logging
    • Security monitoring

By implementing these security best practices, you'll significantly improve the security posture of your FastAPI applications and protect them against common vulnerabilities.

Additional Resources

Practice Exercises

  1. Basic Authentication System: Implement a FastAPI application with basic username/password authentication.

  2. JWT Authentication: Extend the basic authentication to use JWT tokens with proper expiration handling.

  3. Role-Based Access Control: Create a system with different user roles and permission levels.

  4. Rate Limiting: Implement rate limiting on critical endpoints to prevent abuse.

  5. Security Headers Middleware: Create a middleware that adds all necessary security headers to responses.

  6. Password Reset Flow: Design a secure password reset flow with temporary tokens and email verification.

Remember, security is not a one-time implementation but an ongoing process. Regularly review and update your security practices to protect against emerging threats.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)