Skip to main content

FastAPI Role-Based Access

When building web applications, controlling who can access what is essential for security. Role-Based Access Control (RBAC) is a powerful approach that restricts system access based on the roles assigned to individual users. In this tutorial, you'll learn how to implement RBAC in FastAPI applications.

What is Role-Based Access Control?

Role-Based Access Control (RBAC) is an authorization strategy that grants or denies access based on a user's assigned role. Unlike simple authentication (which verifies identity), RBAC focuses on what users are allowed to do once they're authenticated.

Key components of RBAC:

  • Users: Individuals accessing the system
  • Roles: Categories that define access levels (e.g., admin, editor, viewer)
  • Permissions: Specific actions that can be performed
  • Resources: The items being accessed (e.g., endpoints, data)

Setting Up Basic RBAC in FastAPI

Let's implement a basic RBAC system in FastAPI. We'll start by creating the necessary dependencies and models.

Step 1: Install Required Dependencies

bash
pip install fastapi python-jose[cryptography] passlib[bcrypt] python-multipart

Step 2: Define User and Role Models

First, let's define our user and role models:

python
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel

class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
USER = "user"

class User(BaseModel):
username: str
email: str
disabled: Optional[bool] = False
roles: List[Role] = [Role.USER]

class UserInDB(User):
hashed_password: str

Step 3: Create a Mock Database

For this tutorial, we'll use a simple dictionary to simulate a database:

python
fake_users_db = {
"johndoe": {
"username": "johndoe",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER]
},
"alice": {
"username": "alice",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER, Role.EDITOR]
},
"admin": {
"username": "admin",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER, Role.EDITOR, Role.ADMIN]
}
}

Step 4: Implement Authentication with JWT

We'll use JWT tokens for authentication:

python
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# Security configurations
SECRET_KEY = "YOUR_SECRET_KEY" # In production, use a secure random key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None

def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=username)
if user is None:
raise credentials_exception
return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

Step 5: Create Role-Based Authorization Dependencies

Now, let's create dependencies that check if a user has specific roles:

python
def has_role(required_roles: List[Role]):
async def role_checker(current_user: User = Depends(get_current_active_user)):
for role in required_roles:
if role not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role {role} required"
)
return current_user
return role_checker

# Create specific role-based dependencies
require_admin = has_role([Role.ADMIN])
require_editor = has_role([Role.EDITOR])
require_user = has_role([Role.USER])

Step 6: Implement the Login Endpoint

python
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

Step 7: Create Role-Protected Endpoints

Now we can create endpoints that require specific roles:

python
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""Any authenticated user can access this endpoint"""
return current_user

@app.get("/admin-only")
async def admin_only(current_user: User = Depends(require_admin)):
"""Only users with admin role can access this endpoint"""
return {"message": "Hello Admin!", "user": current_user}

@app.get("/editor-content")
async def editor_content(current_user: User = Depends(require_editor)):
"""Only users with editor role can access this endpoint"""
return {"message": "You can edit content", "user": current_user}

@app.get("/users/all", response_model=List[User])
async def get_all_users(current_user: User = Depends(require_admin)):
"""Only admins can view all users"""
return list(map(lambda user: User(**user), fake_users_db.values()))

Testing Our Role-Based Access Control

Let's see our RBAC system in action:

  1. Login as regular user (johndoe):

    • Request: POST /token with username "johndoe" and password "secret"
    • Response: JWT access token
    • With this token, you can access /users/me but not /admin-only or /editor-content
  2. Login as editor (alice):

    • Request: POST /token with username "alice" and password "secret"
    • Response: JWT access token
    • With this token, you can access /users/me and /editor-content but not /admin-only
  3. Login as admin:

    • Request: POST /token with username "admin" and password "secret"
    • Response: JWT access token
    • With this token, you can access all endpoints: /users/me, /editor-content, and /admin-only

Advanced RBAC Implementation

For larger applications, you might want to implement more granular permissions. Here's an extended approach:

Adding Permissions to Roles

python
from enum import Enum
from typing import List, Optional, Set

class Permission(str, Enum):
READ_ITEMS = "read:items"
CREATE_ITEMS = "create:items"
UPDATE_ITEMS = "update:items"
DELETE_ITEMS = "delete:items"
READ_USERS = "read:users"
MANAGE_USERS = "manage:users"

# Define role permissions
ROLE_PERMISSIONS = {
Role.USER: {Permission.READ_ITEMS},
Role.EDITOR: {Permission.READ_ITEMS, Permission.CREATE_ITEMS, Permission.UPDATE_ITEMS},
Role.ADMIN: {Permission.READ_ITEMS, Permission.CREATE_ITEMS,
Permission.UPDATE_ITEMS, Permission.DELETE_ITEMS,
Permission.READ_USERS, Permission.MANAGE_USERS}
}

def get_user_permissions(user_roles: List[Role]) -> Set[Permission]:
"""Calculate all permissions a user has based on their roles"""
permissions = set()
for role in user_roles:
permissions.update(ROLE_PERMISSIONS.get(role, set()))
return permissions

def has_permission(required_permission: Permission):
"""Dependency to check if user has a specific permission"""
async def permission_checker(current_user: User = Depends(get_current_active_user)):
user_permissions = get_user_permissions(current_user.roles)
if required_permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission {required_permission} required"
)
return current_user
return permission_checker

Using Permission-Based Endpoints

python
@app.post("/items/")
async def create_item(current_user: User = Depends(has_permission(Permission.CREATE_ITEMS))):
"""Only users who can create items can access this endpoint"""
return {"message": "Item created successfully"}

@app.delete("/items/{item_id}")
async def delete_item(
item_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_ITEMS))
):
"""Only users who can delete items can access this endpoint"""
return {"message": f"Item {item_id} deleted successfully"}

Real-World Example: Blog API with RBAC

Let's implement a simplified blog API with different access levels:

python
from fastapi import FastAPI, Depends, HTTPException, status
from typing import List, Dict, Any

app = FastAPI()

# Mock blog post database
blog_posts = {
1: {"id": 1, "title": "First Post", "content": "Hello World", "published": True},
2: {"id": 2, "title": "Draft Post", "content": "Work in progress", "published": False}
}

@app.get("/posts/", response_model=List[Dict[str, Any]])
async def list_posts(current_user: User = Depends(get_current_active_user)):
"""
List posts logic:
- Admins and editors see all posts (published and drafts)
- Regular users see only published posts
"""
if Role.ADMIN in current_user.roles or Role.EDITOR in current_user.roles:
return list(blog_posts.values())
else:
return [post for post in blog_posts.values() if post["published"]]

@app.get("/posts/{post_id}")
async def get_post(post_id: int, current_user: User = Depends(get_current_active_user)):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")

post = blog_posts[post_id]

# Check if user can access unpublished posts
if not post["published"] and (Role.ADMIN not in current_user.roles and
Role.EDITOR not in current_user.roles):
raise HTTPException(status_code=403, detail="Access denied")

return post

@app.post("/posts/")
async def create_post(
post: dict,
current_user: User = Depends(has_permission(Permission.CREATE_ITEMS))
):
post_id = max(blog_posts.keys()) + 1
new_post = {
"id": post_id,
"title": post["title"],
"content": post["content"],
"published": post.get("published", False)
}
blog_posts[post_id] = new_post
return new_post

@app.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_data: dict,
current_user: User = Depends(has_permission(Permission.UPDATE_ITEMS))
):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")

blog_posts[post_id].update(post_data)
return blog_posts[post_id]

@app.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_ITEMS))
):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")

deleted_post = blog_posts.pop(post_id)
return {"message": f"Post '{deleted_post['title']}' deleted successfully"}

Best Practices for RBAC in FastAPI

  1. Keep Roles Simple: Start with a few roles and expand as needed
  2. Use Dependency Injection: FastAPI's dependency system is perfect for role checks
  3. Cache Permissions: For larger applications, cache user permissions to reduce database calls
  4. JWT Claims: Include roles or permissions in JWT tokens (but keep token size manageable)
  5. Audit Trails: Log access attempts and role-based actions for security auditing
  6. Hierarchical Roles: Consider implementing role hierarchy (admin includes editor permissions, etc.)
  7. Regular Reviews: Periodically review role assignments and permissions

Common Issues and Solutions

  1. Circular Dependencies: Avoid circular imports by structuring your app properly
  2. Token Size: Don't overload JWTs with too many roles/permissions
  3. Performance: Use efficient role checking algorithms for applications with many roles
  4. Testing: Create test fixtures for different user roles

Summary

In this tutorial, you learned:

  • What Role-Based Access Control (RBAC) is and why it's important
  • How to implement basic RBAC in FastAPI using dependencies
  • How to extend RBAC with granular permissions
  • Best practices for implementing RBAC in production applications

Role-Based Access Control provides a solid security foundation for your FastAPI applications. By controlling what users can access based on their roles, you can create more secure and flexible web APIs.

Additional Resources

Exercises

  1. Implement an RBAC system with role hierarchy (admin > editor > user)
  2. Create a permission-based system where permissions can be assigned directly to users
  3. Add a new endpoint that allows admins to change user roles
  4. Implement a caching system for user permissions to improve performance
  5. Create unit tests for your RBAC implementation using pytest

Happy coding!



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)