FastAPI OAuth2 Implementation
Introduction
OAuth2 is an industry-standard protocol for authorization that enables third-party applications to obtain limited access to a user's account on a server. In this tutorial, we'll explore how to implement OAuth2 authentication in FastAPI applications to secure your APIs properly.
FastAPI provides built-in support for OAuth2 through its security utilities, making it straightforward to implement various authentication flows. We'll focus on the password flow (also known as Resource Owner Password Credentials) and how to use JWT (JSON Web Tokens) for maintaining authenticated sessions.
Prerequisites
Before we begin, make sure you have:
- Python 3.7+
- FastAPI installed (
pip install fastapi
) - Uvicorn for running the server (
pip install uvicorn
) - Python-jose for JWT operations (
pip install python-jose[cryptography]
) - Passlib for password hashing (
pip install passlib[bcrypt]
)
Understanding OAuth2 Concepts
OAuth2 involves several key components:
- Resource Owner: The user who owns the data (e.g., a user with an account)
- Client: The application requesting access to the user's data
- Authorization Server: The server that authenticates the user and issues tokens
- Resource Server: The server that hosts the protected resources
- Access Token: A credential used to access protected resources
In our implementation, FastAPI will act as both the Authorization Server and the Resource Server.
Basic OAuth2 Implementation with Password Flow
Let's start with a basic implementation of OAuth2 with password flow.
Step 1: Set up the necessary imports and create the app
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# Secret key for JWT signing - in production, use a secure random key
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
Step 2: Define the models for users and tokens
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
Step 3: Create a mock user database and utility functions
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Mock database of users
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "[email protected]",
"hashed_password": pwd_context.hash("secret123"),
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Johnson",
"email": "[email protected]",
"hashed_password": pwd_context.hash("password456"),
"disabled": True,
},
}
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
return None
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Step 4: Implement the JWT token creation function
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Step 5: Create the function to get the current user from a token
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
Step 6: Create the token endpoint
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
Step 7: Create a protected endpoint
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]