Skip to main content

FastAPI OAuth2 Implementation

Introduction

OAuth2 is an industry-standard protocol for authorization that enables third-party applications to obtain limited access to a user's account on a server. In this tutorial, we'll explore how to implement OAuth2 authentication in FastAPI applications to secure your APIs properly.

FastAPI provides built-in support for OAuth2 through its security utilities, making it straightforward to implement various authentication flows. We'll focus on the password flow (also known as Resource Owner Password Credentials) and how to use JWT (JSON Web Tokens) for maintaining authenticated sessions.

Prerequisites

Before we begin, make sure you have:

  • Python 3.7+
  • FastAPI installed (pip install fastapi)
  • Uvicorn for running the server (pip install uvicorn)
  • Python-jose for JWT operations (pip install python-jose[cryptography])
  • Passlib for password hashing (pip install passlib[bcrypt])

Understanding OAuth2 Concepts

OAuth2 involves several key components:

  • Resource Owner: The user who owns the data (e.g., a user with an account)
  • Client: The application requesting access to the user's data
  • Authorization Server: The server that authenticates the user and issues tokens
  • Resource Server: The server that hosts the protected resources
  • Access Token: A credential used to access protected resources

In our implementation, FastAPI will act as both the Authorization Server and the Resource Server.

Basic OAuth2 Implementation with Password Flow

Let's start with a basic implementation of OAuth2 with password flow.

Step 1: Set up the necessary imports and create the app

python
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# Secret key for JWT signing - in production, use a secure random key
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI()

Step 2: Define the models for users and tokens

python
class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: Optional[str] = None

class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None

class UserInDB(User):
hashed_password: str

Step 3: Create a mock user database and utility functions

python
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Mock database of users
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": pwd_context.hash("secret123"),
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Johnson",
"email": "[email protected]",
"hashed_password": pwd_context.hash("password456"),
"disabled": True,
},
}

def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None

def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

Step 4: Implement the JWT token creation function

python
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)

to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

Step 5: Create the function to get the current user from a token

python
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception

user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

Step 6: Create the token endpoint

python
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

Step 7: Create a protected endpoint

python
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user

@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]

Testing the OAuth2 Implementation

How to Test Your Implementation

You can test this implementation using the FastAPI automatic documentation interface (Swagger UI).

  1. Run your FastAPI application using Uvicorn:

    uvicorn main:app --reload
  2. Open your browser and go to http://127.0.0.1:8000/docs

  3. Click on the "Authorize" button at the top

  4. Enter your credentials (e.g., username: "johndoe", password: "secret123")

  5. Now you can access the protected endpoints

Expected Results

When you call the /users/me/ endpoint after authentication, you should see a response like:

json
{
"username": "johndoe",
"email": "[email protected]",
"full_name": "John Doe",
"disabled": false
}

When you call the /users/me/items/ endpoint, you'll see:

json
[
{
"item_id": "Foo",
"owner": "johndoe"
}
]

If you try to access these endpoints without authentication, you'll receive a 401 Unauthorized error.

Advanced OAuth2 Features

Scopes

OAuth2 supports scopes to limit the actions an authenticated user can perform. Let's add scopes to our implementation:

python
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"read": "Read access", "write": "Write access"}
)

@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

# Include scopes in the token
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}

# Endpoint that requires specific scope
@app.get("/items/")
async def read_items(
current_user: User = Depends(
Security(get_current_active_user, scopes=["read"])
)
):
return [{"item_id": 1, "owner": current_user.username}]

Refresh Tokens

To improve security while maintaining a good user experience, you can implement refresh tokens:

python
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str

# Add this function
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=7) # Refresh tokens typically last longer
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

@app.post("/token", response_model=TokenPair)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)

access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)

refresh_token = create_refresh_token(data={"sub": user.username})

return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}

@app.post("/refresh", response_model=Token)
async def refresh_token(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)

try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception

# Create new access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
)

return {"access_token": access_token, "token_type": "bearer"}

Real-world Application: Protecting an API for a Blog

Let's implement a simple blog API with OAuth2 protection:

python
# Adding blog post models
class PostBase(BaseModel):
title: str
content: str
published: bool = True

class PostCreate(PostBase):
pass

class Post(PostBase):
id: int
author: str

# Mock database for posts
posts_db = []
post_id_counter = 0

@app.post("/posts/", response_model=Post)
async def create_post(
post: PostCreate,
current_user: User = Depends(Security(get_current_active_user, scopes=["write"]))
):
global post_id_counter
post_id_counter += 1

new_post = Post(
id=post_id_counter,
author=current_user.username,
title=post.title,
content=post.content,
published=post.published
)

posts_db.append(new_post)
return new_post

@app.get("/posts/", response_model=list[Post])
async def get_posts(
current_user: User = Depends(Security(get_current_active_user, scopes=["read"]))
):
return posts_db

@app.get("/posts/{post_id}", response_model=Post)
async def get_post(
post_id: int,
current_user: User = Depends(Security(get_current_active_user, scopes=["read"]))
):
for post in posts_db:
if post.id == post_id:
return post
raise HTTPException(status_code=404, detail="Post not found")

@app.put("/posts/{post_id}", response_model=Post)
async def update_post(
post_id: int,
post_update: PostCreate,
current_user: User = Depends(Security(get_current_active_user, scopes=["write"]))
):
for i, post in enumerate(posts_db):
if post.id == post_id:
# Check if the current user is the author
if post.author != current_user.username:
raise HTTPException(
status_code=403,
detail="You don't have permission to update this post"
)

updated_post = Post(
id=post_id,
author=current_user.username,
title=post_update.title,
content=post_update.content,
published=post_update.published
)

posts_db[i] = updated_post
return updated_post

raise HTTPException(status_code=404, detail="Post not found")

Best Practices for OAuth2 Implementation

  1. Security First: Always use HTTPS in production to protect tokens in transit.

  2. Token Expiry: Set appropriate expiration times for tokens. Short-lived access tokens with longer-lived refresh tokens are recommended.

  3. Store Secrets Securely: Never hardcode your secret keys. Use environment variables or secret management tools.

python
# Instead of hardcoding:
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"

# Use this approach:
import os
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise ValueError("No SECRET_KEY environment variable set")
  1. Password Storage: Always hash passwords with strong algorithms (like bcrypt).

  2. Token Storage: On the client side, store tokens securely (HTTP-only cookies for web applications).

  3. Validate All Inputs: Don't trust client-provided data, even with authentication.

  4. Implement Rate Limiting: Protect against brute force attacks on your token endpoints.

Summary

In this tutorial, we've covered:

  • The basics of OAuth2 and its components
  • How to implement OAuth2 password flow in FastAPI
  • Creating and validating JWT tokens
  • Implementing scopes for fine-grained access control
  • Using refresh tokens for better security
  • Building a real-world API with OAuth2 protection
  • Best practices for secure OAuth2 implementation

OAuth2 provides a robust framework for securing your FastAPI applications. By following the patterns shown in this tutorial, you can implement secure authentication for your APIs while providing a good developer experience.

Additional Resources

Exercises

  1. Extend the blog API to include a "delete post" endpoint that checks if the current user is the author of the post.

  2. Implement role-based access control where admin users can edit any posts.

  3. Add an endpoint to revoke refresh tokens when a user logs out.

  4. Implement token blacklisting to immediately invalidate tokens when needed.

  5. Create an OAuth2 client credentials flow for machine-to-machine authentication.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)