FastAPI Role-Based Access
When building web applications, controlling who can access what is essential for security. Role-Based Access Control (RBAC) is a powerful approach that restricts system access based on the roles assigned to individual users. In this tutorial, you'll learn how to implement RBAC in FastAPI applications.
What is Role-Based Access Control?
Role-Based Access Control (RBAC) is an authorization strategy that grants or denies access based on a user's assigned role. Unlike simple authentication (which verifies identity), RBAC focuses on what users are allowed to do once they're authenticated.
Key components of RBAC:
- Users: Individuals accessing the system
- Roles: Categories that define access levels (e.g., admin, editor, viewer)
- Permissions: Specific actions that can be performed
- Resources: The items being accessed (e.g., endpoints, data)
Setting Up Basic RBAC in FastAPI
Let's implement a basic RBAC system in FastAPI. We'll start by creating the necessary dependencies and models.
Step 1: Install Required Dependencies
pip install fastapi python-jose[cryptography] passlib[bcrypt] python-multipart
Step 2: Define User and Role Models
First, let's define our user and role models:
from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
USER = "user"
class User(BaseModel):
username: str
email: str
disabled: Optional[bool] = False
roles: List[Role] = [Role.USER]
class UserInDB(User):
hashed_password: str
Step 3: Create a Mock Database
For this tutorial, we'll use a simple dictionary to simulate a database:
fake_users_db = {
"johndoe": {
"username": "johndoe",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER]
},
"alice": {
"username": "alice",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER, Role.EDITOR]
},
"admin": {
"username": "admin",
"email": "[email protected]",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
"roles": [Role.USER, Role.EDITOR, Role.ADMIN]
}
}
Step 4: Implement Authentication with JWT
We'll use JWT tokens for authentication:
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# Security configurations
SECRET_KEY = "YOUR_SECRET_KEY" # In production, use a secure random key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
Step 5: Create Role-Based Authorization Dependencies
Now, let's create dependencies that check if a user has specific roles:
def has_role(required_roles: List[Role]):
async def role_checker(current_user: User = Depends(get_current_active_user)):
for role in required_roles:
if role not in current_user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role {role} required"
)
return current_user
return role_checker
# Create specific role-based dependencies
require_admin = has_role([Role.ADMIN])
require_editor = has_role([Role.EDITOR])
require_user = has_role([Role.USER])
Step 6: Implement the Login Endpoint
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Step 7: Create Role-Protected Endpoints
Now we can create endpoints that require specific roles:
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
"""Any authenticated user can access this endpoint"""
return current_user
@app.get("/admin-only")
async def admin_only(current_user: User = Depends(require_admin)):
"""Only users with admin role can access this endpoint"""
return {"message": "Hello Admin!", "user": current_user}
@app.get("/editor-content")
async def editor_content(current_user: User = Depends(require_editor)):
"""Only users with editor role can access this endpoint"""
return {"message": "You can edit content", "user": current_user}
@app.get("/users/all", response_model=List[User])
async def get_all_users(current_user: User = Depends(require_admin)):
"""Only admins can view all users"""
return list(map(lambda user: User(**user), fake_users_db.values()))
Testing Our Role-Based Access Control
Let's see our RBAC system in action:
-
Login as regular user (johndoe):
- Request:
POST /token
with username "johndoe" and password "secret" - Response: JWT access token
- With this token, you can access
/users/me
but not/admin-only
or/editor-content
- Request:
-
Login as editor (alice):
- Request:
POST /token
with username "alice" and password "secret" - Response: JWT access token
- With this token, you can access
/users/me
and/editor-content
but not/admin-only
- Request:
-
Login as admin:
- Request:
POST /token
with username "admin" and password "secret" - Response: JWT access token
- With this token, you can access all endpoints:
/users/me
,/editor-content
, and/admin-only
- Request:
Advanced RBAC Implementation
For larger applications, you might want to implement more granular permissions. Here's an extended approach:
Adding Permissions to Roles
from enum import Enum
from typing import List, Optional, Set
class Permission(str, Enum):
READ_ITEMS = "read:items"
CREATE_ITEMS = "create:items"
UPDATE_ITEMS = "update:items"
DELETE_ITEMS = "delete:items"
READ_USERS = "read:users"
MANAGE_USERS = "manage:users"
# Define role permissions
ROLE_PERMISSIONS = {
Role.USER: {Permission.READ_ITEMS},
Role.EDITOR: {Permission.READ_ITEMS, Permission.CREATE_ITEMS, Permission.UPDATE_ITEMS},
Role.ADMIN: {Permission.READ_ITEMS, Permission.CREATE_ITEMS,
Permission.UPDATE_ITEMS, Permission.DELETE_ITEMS,
Permission.READ_USERS, Permission.MANAGE_USERS}
}
def get_user_permissions(user_roles: List[Role]) -> Set[Permission]:
"""Calculate all permissions a user has based on their roles"""
permissions = set()
for role in user_roles:
permissions.update(ROLE_PERMISSIONS.get(role, set()))
return permissions
def has_permission(required_permission: Permission):
"""Dependency to check if user has a specific permission"""
async def permission_checker(current_user: User = Depends(get_current_active_user)):
user_permissions = get_user_permissions(current_user.roles)
if required_permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission {required_permission} required"
)
return current_user
return permission_checker
Using Permission-Based Endpoints
@app.post("/items/")
async def create_item(current_user: User = Depends(has_permission(Permission.CREATE_ITEMS))):
"""Only users who can create items can access this endpoint"""
return {"message": "Item created successfully"}
@app.delete("/items/{item_id}")
async def delete_item(
item_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_ITEMS))
):
"""Only users who can delete items can access this endpoint"""
return {"message": f"Item {item_id} deleted successfully"}
Real-World Example: Blog API with RBAC
Let's implement a simplified blog API with different access levels:
from fastapi import FastAPI, Depends, HTTPException, status
from typing import List, Dict, Any
app = FastAPI()
# Mock blog post database
blog_posts = {
1: {"id": 1, "title": "First Post", "content": "Hello World", "published": True},
2: {"id": 2, "title": "Draft Post", "content": "Work in progress", "published": False}
}
@app.get("/posts/", response_model=List[Dict[str, Any]])
async def list_posts(current_user: User = Depends(get_current_active_user)):
"""
List posts logic:
- Admins and editors see all posts (published and drafts)
- Regular users see only published posts
"""
if Role.ADMIN in current_user.roles or Role.EDITOR in current_user.roles:
return list(blog_posts.values())
else:
return [post for post in blog_posts.values() if post["published"]]
@app.get("/posts/{post_id}")
async def get_post(post_id: int, current_user: User = Depends(get_current_active_user)):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")
post = blog_posts[post_id]
# Check if user can access unpublished posts
if not post["published"] and (Role.ADMIN not in current_user.roles and
Role.EDITOR not in current_user.roles):
raise HTTPException(status_code=403, detail="Access denied")
return post
@app.post("/posts/")
async def create_post(
post: dict,
current_user: User = Depends(has_permission(Permission.CREATE_ITEMS))
):
post_id = max(blog_posts.keys()) + 1
new_post = {
"id": post_id,
"title": post["title"],
"content": post["content"],
"published": post.get("published", False)
}
blog_posts[post_id] = new_post
return new_post
@app.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_data: dict,
current_user: User = Depends(has_permission(Permission.UPDATE_ITEMS))
):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")
blog_posts[post_id].update(post_data)
return blog_posts[post_id]
@app.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
current_user: User = Depends(has_permission(Permission.DELETE_ITEMS))
):
if post_id not in blog_posts:
raise HTTPException(status_code=404, detail="Post not found")
deleted_post = blog_posts.pop(post_id)
return {"message": f"Post '{deleted_post['title']}' deleted successfully"}
Best Practices for RBAC in FastAPI
- Keep Roles Simple: Start with a few roles and expand as needed
- Use Dependency Injection: FastAPI's dependency system is perfect for role checks
- Cache Permissions: For larger applications, cache user permissions to reduce database calls
- JWT Claims: Include roles or permissions in JWT tokens (but keep token size manageable)
- Audit Trails: Log access attempts and role-based actions for security auditing
- Hierarchical Roles: Consider implementing role hierarchy (admin includes editor permissions, etc.)
- Regular Reviews: Periodically review role assignments and permissions
Common Issues and Solutions
- Circular Dependencies: Avoid circular imports by structuring your app properly
- Token Size: Don't overload JWTs with too many roles/permissions
- Performance: Use efficient role checking algorithms for applications with many roles
- Testing: Create test fixtures for different user roles
Summary
In this tutorial, you learned:
- What Role-Based Access Control (RBAC) is and why it's important
- How to implement basic RBAC in FastAPI using dependencies
- How to extend RBAC with granular permissions
- Best practices for implementing RBAC in production applications
Role-Based Access Control provides a solid security foundation for your FastAPI applications. By controlling what users can access based on their roles, you can create more secure and flexible web APIs.
Additional Resources
- FastAPI Official Documentation on Security
- OAuth2 with Password (and hashing), Bearer with JWT tokens
- OWASP Authorization Cheat Sheet
Exercises
- Implement an RBAC system with role hierarchy (admin > editor > user)
- Create a permission-based system where permissions can be assigned directly to users
- Add a new endpoint that allows admins to change user roles
- Implement a caching system for user permissions to improve performance
- Create unit tests for your RBAC implementation using pytest
Happy coding!
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)