Skip to main content

FastAPI Authentication Middleware

Authentication is a critical aspect of web applications, ensuring that users are who they claim to be before granting access to protected resources. In FastAPI, middleware provides an elegant way to handle authentication across multiple routes without duplicating code.

Introduction to Authentication Middleware

Authentication middleware acts as a gatekeeper for your API endpoints. It intercepts incoming requests before they reach your route handlers, verifies user credentials, and either allows the request to proceed or rejects it with an appropriate error message.

Unlike per-route security using FastAPI's built-in security utilities, middleware allows you to apply authentication logic consistently across multiple routes or your entire application.

Understanding Middleware in FastAPI

Before diving into authentication-specific middleware, let's recap how middleware works in FastAPI:

python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

@app.middleware("http")
async def example_middleware(request: Request, call_next):
# Before request processing
print("Request received")

# Process the request
response = await call_next(request)

# After request processing
print("Response sent")

return response

The middleware receives the incoming request, performs some pre-processing, passes the request to the next middleware or route handler, then performs some post-processing before returning the response.

Basic Authentication Middleware

Let's create a simple authentication middleware that checks for an API key in the request headers:

python
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

API_KEY = "secret_api_key_12345"

@app.middleware("http")
async def authenticate_request(request: Request, call_next):
if request.headers.get("X-API-Key") != API_KEY:
return JSONResponse(
status_code=401,
content={"detail": "Invalid API Key"}
)

response = await call_next(request)
return response

@app.get("/protected-resource/")
async def get_protected_resource():
return {"message": "You accessed the protected resource!"}

@app.get("/public-resource/")
async def get_public_resource():
return {"message": "This is a public resource."}

However, this middleware applies to all routes. What if we want some routes to be public?

Selective Authentication Middleware

We can modify our middleware to only apply authentication to certain paths:

python
@app.middleware("http")
async def selective_authentication(request: Request, call_next):
# List of paths that require authentication
protected_paths = ["/protected-resource/", "/admin/", "/user-profile/"]

# Check if the current path requires authentication
if any(request.url.path.startswith(path) for path in protected_paths):
if request.headers.get("X-API-Key") != API_KEY:
return JSONResponse(
status_code=401,
content={"detail": "Invalid API Key"}
)

response = await call_next(request)
return response

This way, only the routes starting with the specified paths will require authentication.

JWT Authentication Middleware

For more complex authentication scenarios, JSON Web Tokens (JWT) are commonly used. Here's how to implement JWT authentication as middleware:

python
from fastapi import FastAPI, Request, HTTPException
import jwt
from datetime import datetime, timedelta
from typing import Optional

app = FastAPI()

SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"

# Helper function to create a new JWT token
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

@app.middleware("http")
async def jwt_authentication_middleware(request: Request, call_next):
protected_paths = ["/protected/", "/admin/"]

if any(request.url.path.startswith(path) for path in protected_paths):
authorization = request.headers.get("Authorization")
if not authorization:
return JSONResponse(
status_code=401,
content={"detail": "Authorization header missing"}
)

scheme, token = authorization.split()
if scheme.lower() != "bearer":
return JSONResponse(
status_code=401,
content={"detail": "Invalid authentication scheme"}
)

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Add user info to request state
request.state.user = payload.get("sub")
except jwt.PyJWTError:
return JSONResponse(
status_code=401,
content={"detail": "Invalid token"}
)

return await call_next(request)

@app.get("/token/")
async def get_token():
# This would normally validate username/password
# Here we're just creating a token for demonstration
access_token = create_access_token(
data={"sub": "[email protected]"},
expires_delta=timedelta(minutes=30)
)
return {"access_token": access_token, "token_type": "bearer"}

@app.get("/protected/")
async def protected_route(request: Request):
return {
"message": "You accessed a protected route",
"user": request.state.user
}

@app.get("/public/")
async def public_route():
return {"message": "This is a public route"}

Real-World Example: Role-Based Authentication Middleware

In production applications, you might need to check not only if a user is authenticated but also if they have the required role to access certain endpoints. Here's a more elaborate example:

python
from fastapi import FastAPI, Request, HTTPException
import jwt
from datetime import datetime, timedelta
from typing import Optional, List

app = FastAPI()

SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"

# Define role permissions
ROLE_PERMISSIONS = {
"admin": ["read", "write", "delete"],
"editor": ["read", "write"],
"viewer": ["read"]
}

# Function to create token with role
def create_access_token(username: str, role: str, expires_delta: Optional[timedelta] = None):
to_encode = {"sub": username, "role": role}
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

@app.middleware("http")
async def role_based_auth_middleware(request: Request, call_next):
# Define which paths need which permissions
path_permissions = {
"/admin/": ["admin"],
"/content/edit/": ["admin", "editor"],
"/content/view/": ["admin", "editor", "viewer"]
}

# Find if the current path requires permissions
required_roles = None
for path, roles in path_permissions.items():
if request.url.path.startswith(path):
required_roles = roles
break

# If path requires authentication
if required_roles:
authorization = request.headers.get("Authorization")
if not authorization:
return JSONResponse(
status_code=401,
content={"detail": "Authorization header missing"}
)

try:
scheme, token = authorization.split()
if scheme.lower() != "bearer":
return JSONResponse(
status_code=401,
content={"detail": "Invalid authentication scheme"}
)

payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_role = payload.get("role")

if user_role not in required_roles:
return JSONResponse(
status_code=403,
content={"detail": f"Access denied. Required roles: {required_roles}"}
)

# Add user info to request state for use in route handlers
request.state.user = {
"username": payload.get("sub"),
"role": user_role
}

except jwt.PyJWTError as e:
return JSONResponse(
status_code=401,
content={"detail": f"Invalid token: {str(e)}"}
)

return await call_next(request)

# Routes for token generation (for demonstration)
@app.get("/token/{role}")
async def get_token(role: str):
if role not in ROLE_PERMISSIONS:
raise HTTPException(status_code=400, detail=f"Invalid role. Choose from: {list(ROLE_PERMISSIONS.keys())}")

access_token = create_access_token(
username="[email protected]",
role=role,
expires_delta=timedelta(minutes=30)
)
return {"access_token": access_token, "token_type": "bearer", "role": role}

# Protected routes
@app.get("/admin/")
async def admin_route(request: Request):
return {
"message": "You accessed the admin route",
"user": request.state.user
}

@app.get("/content/edit/")
async def edit_content(request: Request):
return {
"message": "You can edit content",
"user": request.state.user
}

@app.get("/content/view/")
async def view_content(request: Request):
return {
"message": "You can view content",
"user": request.state.user
}

@app.get("/public/")
async def public_route():
return {"message": "This is a public route"}

Testing Authentication Middleware

To test our authentication middleware, we can use tools like curl or Postman. Here's an example using curl:

  1. First, get a token:
bash
$ curl -X GET "http://localhost:8000/token/admin"
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", "token_type":"bearer", "role":"admin"}
  1. Then use the token to access a protected route:
bash
$ curl -X GET "http://localhost:8000/admin/" -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
{"message":"You accessed the admin route","user":{"username":"[email protected]","role":"admin"}}
  1. Trying to access a protected route without a token:
bash
$ curl -X GET "http://localhost:8000/admin/"
{"detail":"Authorization header missing"}

Best Practices for Authentication Middleware

  1. Use HTTPS: Always serve your API over HTTPS to prevent token interception.

  2. Use Short-lived Tokens: Set reasonable expiration times for your tokens.

  3. Store Secrets Securely: Never hardcode secrets in your code. Use environment variables or a secure secrets manager.

  4. Implement Rate Limiting: Protect against brute force attacks by implementing rate limiting.

  5. Log Authentication Failures: Keep track of failed authentication attempts for security monitoring.

  6. Include Refresh Token Mechanism: For long-lived sessions, implement a refresh token system.

Let's see an example of adding basic rate limiting to our authentication middleware:

python
from fastapi import FastAPI, Request
import time
from collections import defaultdict

app = FastAPI()

# Simple in-memory rate limiter
class RateLimiter:
def __init__(self, max_requests=5, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.request_counts = defaultdict(list)

def is_rate_limited(self, ip_address):
current_time = time.time()
# Remove old requests
self.request_counts[ip_address] = [
timestamp for timestamp in self.request_counts[ip_address]
if current_time - timestamp < self.window_seconds
]

# Check if too many requests
if len(self.request_counts[ip_address]) >= self.max_requests:
return True

# Add current request timestamp
self.request_counts[ip_address].append(current_time)
return False

rate_limiter = RateLimiter(max_requests=5, window_seconds=60)

@app.middleware("http")
async def auth_with_rate_limiting(request: Request, call_next):
# Only rate limit authentication attempts
if request.url.path == "/token/":
client_ip = request.client.host

if rate_limiter.is_rate_limited(client_ip):
return JSONResponse(
status_code=429,
content={"detail": "Too many authentication attempts. Try again later."}
)

# Your regular authentication middleware code here...
return await call_next(request)

Summary

Authentication middleware in FastAPI provides a powerful way to secure your API by centralizing authentication logic. We've covered:

  1. How to implement basic API key authentication
  2. Selective route protection
  3. JWT-based authentication
  4. Role-based access control
  5. Testing authentication middleware
  6. Best practices including rate limiting

By implementing proper authentication middleware, you can ensure that your FastAPI applications remain secure while maintaining clean, DRY code.

Additional Resources

Exercises

  1. Modify the JWT authentication middleware to read the secret key from an environment variable.

  2. Implement a middleware that combines authentication with logging functionality, recording successful and failed authentication attempts.

  3. Create a middleware that can authenticate users from either JWT tokens or API keys, depending on what's provided in the request.

  4. Extend the role-based middleware to support fine-grained permissions (e.g., "read", "write", "delete") rather than just roles.

  5. Implement a more sophisticated rate limiter that uses Redis to track authentication attempts across multiple server instances.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)