FastAPI Authorization
Introduction
Authorization is a critical security concept that determines what resources a user can access and what actions they can perform after they've been authenticated. While authentication verifies who a user is, authorization determines what they're allowed to do.
In this tutorial, we'll explore how to implement authorization in FastAPI applications, starting with basic concepts and moving to more advanced techniques. By the end, you'll be able to secure your FastAPI endpoints with role-based access control and custom permission systems.
Prerequisites
Before diving into authorization, you should have:
- Basic knowledge of FastAPI
- Understanding of FastAPI Authentication
- Python 3.7+ installed
- A FastAPI project set up
Basic Authorization Concepts
The Difference Between Authentication and Authorization
First, let's clarify the distinction:
- Authentication: Verifies the identity of a user (Who are you?)
- Authorization: Determines the access rights of a user (What are you allowed to do?)
Authorization Flow in FastAPI
- A user sends a request with authentication credentials (token, username/password)
- The server authenticates the user
- If authentication succeeds, the application checks if the user has the required permissions
- Based on the authorization check, the request is either processed or rejected
Implementing Basic Role-Based Authorization
Let's start with a simple role-based authorization system using FastAPI's dependency injection.
Step 1: Define User Roles
from enum import Enum
class UserRole(str, Enum):
ADMIN = "admin"
MANAGER = "manager"
USER = "user"
Step 2: Create a User Model with Roles
from pydantic import BaseModel
from typing import Optional, List
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
roles: List[UserRole] = [UserRole.USER] # Default role
Step 3: Implement Role-Based Authorization Dependencies
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Mock database of users
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
"roles": [UserRole.USER]
},
"alice": {
"username": "alice",
"full_name": "Alice Smith",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
"disabled": False,
"roles": [UserRole.USER, UserRole.MANAGER]
},
"admin": {
"username": "admin",
"full_name": "Admin",
"email": "[email protected]",
"hashed_password": "fakehashedsecret3",
"disabled": False,
"roles": [UserRole.USER, UserRole.ADMIN]
}
}
def get_user(username: str):
if username in fake_users_db:
user_dict = fake_users_db[username]
return User(**user_dict)
return None
async def get_current_user(token: str = Depends(oauth2_scheme)):
# In a real app, decode JWT token to get username
username = token # Simplified for demo
user = get_user(username)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# Role-based authorization dependency
def has_role(required_roles: List[UserRole]):
async def role_checker(current_user: User = Depends(get_current_active_user)):
for role in required_roles:
if role not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role {role} required"
)
return current_user
return role_checker
Step 4: Apply Role-Based Authorization to Endpoints
from fastapi import FastAPI, Depends
app = FastAPI()
@app.get("/users/me", tags=["users"])
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""Any authenticated user can access their own info"""
return current_user
@app.get("/users/all", tags=["users"])
async def read_all_users(current_user: User = Depends(has_role([UserRole.ADMIN]))):
"""Only admins can access all user data"""
return fake_users_db
@app.get("/reports", tags=["reports"])
async def get_reports(current_user: User = Depends(has_role([UserRole.MANAGER, UserRole.ADMIN]))):
"""Only managers and admins can access reports"""
return {"reports": ["Report 1", "Report 2"]}
Advanced Authorization: Permissions System
For more granular control, let's implement a permission-based system.
Step 1: Define Permissions
class Permission(str, Enum):
READ_ITEMS = "read:items"
WRITE_ITEMS = "write:items"
DELETE_ITEMS = "delete:items"
READ_USERS = "read:users"
WRITE_USERS = "write:users"
Step 2: Update User Model with Permissions
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
roles: List[UserRole] = [UserRole.USER]
permissions: List[Permission] = []
Step 3: Update Mock Database with Permissions
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": "fakehashedsecret",
"disabled": False,
"roles": [UserRole.USER],
"permissions": [Permission.READ_ITEMS]
},
"alice": {
"username": "alice",
"full_name": "Alice Smith",
"email": "[email protected]",
"hashed_password": "fakehashedsecret2",
"disabled": False,
"roles": [UserRole.USER, UserRole.MANAGER],
"permissions": [Permission.READ_ITEMS, Permission.WRITE_ITEMS, Permission.READ_USERS]
},
"admin": {
"username": "admin",
"full_name": "Admin",
"email": "[email protected]",
"hashed_password": "fakehashedsecret3",
"disabled": False,
"roles": [UserRole.USER, UserRole.ADMIN],
"permissions": [Permission.READ_ITEMS, Permission.WRITE_ITEMS, Permission.DELETE_ITEMS,
Permission.READ_USERS, Permission.WRITE_USERS]
}
}
Step 4: Implement Permission-Based Authorization
def has_permission(required_permissions: List[Permission]):
async def permission_checker(current_user: User = Depends(get_current_active_user)):
for permission in required_permissions:
if permission not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission {permission} required"
)
return current_user
return permission_checker
Step 5: Apply Permission-Based Authorization to Endpoints
@app.get("/items/", tags=["items"])
async def read_items(current_user: User = Depends(has_permission([Permission.READ_ITEMS]))):
"""Users with read:items permission can access this endpoint"""
return [{"item_id": "1", "name": "Item 1"}, {"item_id": "2", "name": "Item 2"}]
@app.post("/items/", tags=["items"])
async def create_item(current_user: User = Depends(has_permission([Permission.WRITE_ITEMS]))):
"""Users with write:items permission can access this endpoint"""
return {"status": "Item created"}
@app.delete("/items/{item_id}", tags=["items"])
async def delete_item(
item_id: str,
current_user: User = Depends(has_permission([Permission.DELETE_ITEMS]))
):
"""Users with delete:items permission can access this endpoint"""
return {"status": f"Item {item_id} deleted"}
Combining Roles and Permissions
In real-world applications, you might want to combine roles and permissions for more flexible authorization.
def authorize(roles=None, permissions=None):
if roles is None:
roles = []
if permissions is None:
permissions = []
async def authorization_checker(current_user: User = Depends(get_current_active_user)):
role_check = not roles or any(role in current_user.roles for role in roles)
permission_check = not permissions or all(perm in current_user.permissions for perm in permissions)
if not (role_check and permission_check):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
return authorization_checker
@app.put("/advanced-endpoint", tags=["advanced"])
async def advanced_endpoint(
current_user: User = Depends(
authorize(
roles=[UserRole.ADMIN, UserRole.MANAGER],
permissions=[Permission.WRITE_ITEMS]
)
)
):
"""This endpoint requires either ADMIN or MANAGER role AND write:items permission"""
return {"message": "Advanced operation completed"}
JWT Token-Based Authorization
In practice, you'll likely use JWT tokens for both authentication and authorization. Here's how to include role and permission claims in your JWT:
Step 1: Create JWT with Claims
from datetime import datetime, timedelta
from jose import JWTError, jwt
from typing import Dict
# Secret key for signing the JWT
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: Dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def create_user_token(user: User):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user.username,
"roles": [role for role in user.roles],
"permissions": [perm for perm in user.permissions]
},
expires_delta=access_token_expires
)
return access_token
Step 2: Token Verification with Role/Permission Checking
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user_from_token(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# Get roles and permissions from token
roles = payload.get("roles", [])
permissions = payload.get("permissions", [])
# Get user from database (you might want to verify against DB)
user = get_user(username)
if user is None:
raise credentials_exception
# Update user with roles and permissions from token
user.roles = roles
user.permissions = permissions
return user
except JWTError:
raise credentials_exception
Real-World Implementation: Securing a Blog API
Let's put it all together with a practical example of a blog API:
from fastapi import FastAPI, Depends, HTTPException, status
from typing import List, Optional
from pydantic import BaseModel
app = FastAPI()
# Blog-specific permissions
class BlogPermission(str, Enum):
READ_POSTS = "read:posts"
WRITE_POSTS = "write:posts"
EDIT_OWN_POSTS = "edit:own_posts"
EDIT_ANY_POST = "edit:any_post"
DELETE_OWN_POSTS = "delete:own_posts"
DELETE_ANY_POST = "delete:any_post"
MODERATE_COMMENTS = "moderate:comments"
# Blog post model
class BlogPost(BaseModel):
id: int
title: str
content: str
author_username: str
published: bool = False
# Mock database of blog posts
blog_posts = [
BlogPost(id=1, title="First post", content="Hello world!", author_username="johndoe", published=True),
BlogPost(id=2, title="Draft post", content="Work in progress...", author_username="johndoe", published=False),
BlogPost(id=3, title="Admin post", content="Admin content", author_username="admin", published=True)
]
# Permission-based access control
def can_read_posts(current_user: User = Depends(get_current_user_from_token)):
if BlogPermission.READ_POSTS not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to read posts"
)
return current_user
def can_write_posts(current_user: User = Depends(get_current_user_from_token)):
if BlogPermission.WRITE_POSTS not in current_user.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to create posts"
)
return current_user
# Resource-based access control - can only edit own posts unless admin
def can_edit_post(post_id: int):
async def edit_permission_checker(current_user: User = Depends(get_current_user_from_token)):
# Find post
post = next((p for p in blog_posts if p.id == post_id), None)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check permissions
is_own_post = post.author_username == current_user.username
can_edit_own = is_own_post and BlogPermission.EDIT_OWN_POSTS in current_user.permissions
can_edit_any = BlogPermission.EDIT_ANY_POST in current_user.permissions
if not (can_edit_own or can_edit_any):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions to edit this post"
)
return current_user
return edit_permission_checker
# Blog endpoints
@app.get("/posts/", tags=["blog"])
async def list_posts(
current_user: User = Depends(can_read_posts),
include_unpublished: bool = False
):
"""List all published posts (or all posts if user has right permissions)"""
if include_unpublished and BlogPermission.EDIT_ANY_POST in current_user.permissions:
return blog_posts
# For regular users, filter out unpublished posts except their own
if BlogPermission.EDIT_OWN_POSTS in current_user.permissions:
return [
post for post in blog_posts
if post.published or post.author_username == current_user.username
]
# For readers, only show published posts
return [post for post in blog_posts if post.published]
@app.post("/posts/", tags=["blog"])
async def create_post(
post: BlogPost,
current_user: User = Depends(can_write_posts)
):
"""Create a new blog post"""
# Set the current user as author
post.author_username = current_user.username
# Add the post to the database
post.id = max(p.id for p in blog_posts) + 1
blog_posts.append(post)
return post
@app.put("/posts/{post_id}", tags=["blog"])
async def update_post(
post_id: int,
updated_post: BlogPost,
current_user: User = Depends(can_edit_post(post_id))
):
"""Update a blog post"""
# Find and update the post
for i, post in enumerate(blog_posts):
if post.id == post_id:
# Preserve the original author and ID
updated_post.id = post_id
updated_post.author_username = post.author_username
blog_posts[i] = updated_post
return updated_post
# This should never happen as the dependency already checks if the post exists
raise HTTPException(status_code=404, detail="Post not found")
Best Practices for Authorization in FastAPI
-
Separate Authentication from Authorization: Keep these concerns distinct in your code.
-
Use Least Privilege Principle: Grant only the minimum permissions necessary.
-
Implement Role-Based Access Control (RBAC): For coarse-grained access control.
-
Add Attribute-Based Access Control (ABAC): For fine-grained permissions.
-
Protect Against Security Pitfalls:
- No authorization bypass through alternative routes
- Check authorizations on each request (stateless authorization)
- Validate JWT tokens thoroughly
-
Log Authorization Failures: Monitor for potential attacks.
-
Implement Rate Limiting: Prevent brute force attempts.
-
Regular Security Reviews: Especially when adding new endpoints.
Summary
In this tutorial, we've explored how to implement authorization in FastAPI applications:
- We started with basic role-based authorization using dependency injection
- We built a permission-based authorization system for more granular control
- We combined roles and permissions for flexible access control
- We implemented JWT token-based authorization with role and permission claims
- We created a practical example with a blog API
Authorization is a critical aspect of web application security. By implementing proper authorization checks, you can ensure that users can only access the resources they're permitted to use, protecting your application and your users' data.
Additional Resources
- FastAPI Security Documentation
- JWT.io - Learn more about JSON Web Tokens
- OWASP Authorization Cheat Sheet
- OAuth 2.0 Simplified
Exercises
- Extend the blog API to include comment functionality with moderation permissions
- Implement a time-based authorization system (e.g., premium users get access to certain features for 30 days)
- Create a role hierarchy system where higher roles automatically inherit permissions from lower roles
- Add API endpoint for user administrators to assign/remove roles and permissions
- Implement OAuth2 authorization with an external provider like Google or GitHub
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)