FastAPI OAuth2 Implementation
Introduction
OAuth2 is an industry-standard protocol for authorization that enables third-party applications to obtain limited access to a user's account on a server. In this tutorial, we'll explore how to implement OAuth2 authentication in FastAPI applications to secure your APIs properly.
FastAPI provides built-in support for OAuth2 through its security utilities, making it straightforward to implement various authentication flows. We'll focus on the password flow (also known as Resource Owner Password Credentials) and how to use JWT (JSON Web Tokens) for maintaining authenticated sessions.
Prerequisites
Before we begin, make sure you have:
- Python 3.7+
- FastAPI installed (
pip install fastapi
) - Uvicorn for running the server (
pip install uvicorn
) - Python-jose for JWT operations (
pip install python-jose[cryptography]
) - Passlib for password hashing (
pip install passlib[bcrypt]
)
Understanding OAuth2 Concepts
OAuth2 involves several key components:
- Resource Owner: The user who owns the data (e.g., a user with an account)
- Client: The application requesting access to the user's data
- Authorization Server: The server that authenticates the user and issues tokens
- Resource Server: The server that hosts the protected resources
- Access Token: A credential used to access protected resources
In our implementation, FastAPI will act as both the Authorization Server and the Resource Server.
Basic OAuth2 Implementation with Password Flow
Let's start with a basic implementation of OAuth2 with password flow.
Step 1: Set up the necessary imports and create the app
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# Secret key for JWT signing - in production, use a secure random key
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
Step 2: Define the models for users and tokens
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
Step 3: Create a mock user database and utility functions
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Mock database of users
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": pwd_context.hash("secret123"),
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Johnson",
"email": "[email protected]",
"hashed_password": pwd_context.hash("password456"),
"disabled": True,
},
}
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Step 4: Implement the JWT token creation function
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Step 5: Create the function to get the current user from a token
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
Step 6: Create the token endpoint
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Step 7: Create a protected endpoint
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
Testing the OAuth2 Implementation
How to Test Your Implementation
You can test this implementation using the FastAPI automatic documentation interface (Swagger UI).
-
Run your FastAPI application using Uvicorn:
uvicorn main:app --reload
-
Open your browser and go to
http://127.0.0.1:8000/docs
-
Click on the "Authorize" button at the top
-
Enter your credentials (e.g., username: "johndoe", password: "secret123")
-
Now you can access the protected endpoints
Expected Results
When you call the /users/me/
endpoint after authentication, you should see a response like:
{
"username": "johndoe",
"email": "[email protected]",
"full_name": "John Doe",
"disabled": false
}
When you call the /users/me/items/
endpoint, you'll see:
[
{
"item_id": "Foo",
"owner": "johndoe"
}
]
If you try to access these endpoints without authentication, you'll receive a 401 Unauthorized error.
Advanced OAuth2 Features
Scopes
OAuth2 supports scopes to limit the actions an authenticated user can perform. Let's add scopes to our implementation:
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"read": "Read access", "write": "Write access"}
)
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
# Include scopes in the token
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# Endpoint that requires specific scope
@app.get("/items/")
async def read_items(
current_user: User = Depends(
Security(get_current_active_user, scopes=["read"])
)
):
return [{"item_id": 1, "owner": current_user.username}]
Refresh Tokens
To improve security while maintaining a good user experience, you can implement refresh tokens:
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str
# Add this function
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7) # Refresh tokens typically last longer
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token", response_model=TokenPair)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
@app.post("/refresh", response_model=Token)
async def refresh_token(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Create new access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Real-world Application: Protecting an API for a Blog
Let's implement a simple blog API with OAuth2 protection:
# Adding blog post models
class PostBase(BaseModel):
title: str
content: str
published: bool = True
class PostCreate(PostBase):
pass
class Post(PostBase):
id: int
author: str
# Mock database for posts
posts_db = []
post_id_counter = 0
@app.post("/posts/", response_model=Post)
async def create_post(
post: PostCreate,
current_user: User = Depends(Security(get_current_active_user, scopes=["write"]))
):
global post_id_counter
post_id_counter += 1
new_post = Post(
id=post_id_counter,
author=current_user.username,
title=post.title,
content=post.content,
published=post.published
)
posts_db.append(new_post)
return new_post
@app.get("/posts/", response_model=list[Post])
async def get_posts(
current_user: User = Depends(Security(get_current_active_user, scopes=["read"]))
):
return posts_db
@app.get("/posts/{post_id}", response_model=Post)
async def get_post(
post_id: int,
current_user: User = Depends(Security(get_current_active_user, scopes=["read"]))
):
for post in posts_db:
if post.id == post_id:
return post
raise HTTPException(status_code=404, detail="Post not found")
@app.put("/posts/{post_id}", response_model=Post)
async def update_post(
post_id: int,
post_update: PostCreate,
current_user: User = Depends(Security(get_current_active_user, scopes=["write"]))
):
for i, post in enumerate(posts_db):
if post.id == post_id:
# Check if the current user is the author
if post.author != current_user.username:
raise HTTPException(
status_code=403,
detail="You don't have permission to update this post"
)
updated_post = Post(
id=post_id,
author=current_user.username,
title=post_update.title,
content=post_update.content,
published=post_update.published
)
posts_db[i] = updated_post
return updated_post
raise HTTPException(status_code=404, detail="Post not found")
Best Practices for OAuth2 Implementation
-
Security First: Always use HTTPS in production to protect tokens in transit.
-
Token Expiry: Set appropriate expiration times for tokens. Short-lived access tokens with longer-lived refresh tokens are recommended.
-
Store Secrets Securely: Never hardcode your secret keys. Use environment variables or secret management tools.
# Instead of hardcoding:
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# Use this approach:
import os
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise ValueError("No SECRET_KEY environment variable set")
-
Password Storage: Always hash passwords with strong algorithms (like bcrypt).
-
Token Storage: On the client side, store tokens securely (HTTP-only cookies for web applications).
-
Validate All Inputs: Don't trust client-provided data, even with authentication.
-
Implement Rate Limiting: Protect against brute force attacks on your token endpoints.
Summary
In this tutorial, we've covered:
- The basics of OAuth2 and its components
- How to implement OAuth2 password flow in FastAPI
- Creating and validating JWT tokens
- Implementing scopes for fine-grained access control
- Using refresh tokens for better security
- Building a real-world API with OAuth2 protection
- Best practices for secure OAuth2 implementation
OAuth2 provides a robust framework for securing your FastAPI applications. By following the patterns shown in this tutorial, you can implement secure authentication for your APIs while providing a good developer experience.
Additional Resources
- FastAPI Security Documentation
- OAuth 2.0 Specification
- JWT.io - Useful for debugging JWTs
- Python-jose Documentation
Exercises
-
Extend the blog API to include a "delete post" endpoint that checks if the current user is the author of the post.
-
Implement role-based access control where admin users can edit any posts.
-
Add an endpoint to revoke refresh tokens when a user logs out.
-
Implement token blacklisting to immediately invalidate tokens when needed.
-
Create an OAuth2 client credentials flow for machine-to-machine authentication.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)