FastAPI Redis Integration
Introduction
Redis (Remote Dictionary Server) is an open-source, in-memory data structure store that can be used as a database, cache, message broker, and more. When integrated with FastAPI, Redis provides powerful capabilities for improving application performance, managing sessions, implementing rate limiting, and handling real-time data.
In this tutorial, we'll explore how to integrate Redis with your FastAPI applications. We'll cover:
- Setting up Redis with FastAPI
- Using Redis for caching API responses
- Implementing rate limiting
- Storing and retrieving session data
- Building a simple real-time notification system
Prerequisites
Before we begin, you should have:
- Basic knowledge of FastAPI
- Python 3.7+ installed
- Redis server installed (or access to a Redis instance)
- Understanding of async programming concepts
Setting Up Redis with FastAPI
Installation
First, let's install the required packages:
pip install fastapi uvicorn redis
For asynchronous Redis operations, we'll use aioredis
:
pip install aioredis
Creating a Redis Connection
Let's create a basic FastAPI application with Redis integration:
from fastapi import FastAPI, Depends, HTTPException
import aioredis
from typing import Optional
app = FastAPI(title="FastAPI Redis Integration")
# Redis connection
async def get_redis():
redis = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
try:
yield redis
finally:
await redis.close()
@app.get("/")
async def root():
return {"message": "FastAPI Redis Integration"}
Here, we've created a dependency get_redis()
that establishes a connection to a Redis server running on localhost. This dependency can be injected into any API endpoint that needs to interact with Redis.
Implementing Caching with Redis
One of the most common use cases for Redis is caching API responses to improve performance.
Basic Response Caching
Let's create a simple caching mechanism for an API endpoint:
from fastapi import Path
import json
import time
@app.get("/items/{item_id}")
async def get_item(item_id: int = Path(...), redis = Depends(get_redis)):
# Try to get data from cache first
cached_item = await redis.get(f"item:{item_id}")
if cached_item:
return json.loads(cached_item)
# If not in cache, "fetch" from database (simulated)
# In a real app, you would query your database here
time.sleep(1) # Simulate database query delay
item = {"id": item_id, "name": f"Item {item_id}", "description": "This is an expensive database call"}
# Store in cache for future requests (expire after 60 seconds)
await redis.set(f"item:{item_id}", json.dumps(item), ex=60)
return item
When a user requests an item, the API first checks if the item exists in Redis. If it does, it returns the cached data immediately. If not, it simulates fetching the data from a database (slow operation), then caches the result in Redis for future requests.
Creating a Reusable Caching Decorator
Let's create a more reusable caching decorator:
import functools
from fastapi import Request
def cache_response(expire_seconds: int = 60):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Get Redis connection
for arg in args:
if isinstance(arg, Request):
request = arg
break
else:
raise ValueError("Request object not found in function arguments")
# Create cache key from function name and arguments
cache_key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
# Get Redis connection using app state
redis = request.app.state.redis
# Try to get from cache
cached_response = await redis.get(cache_key)
if cached_response:
return json.loads(cached_response)
# Execute original function
response = await func(*args, **kwargs)
# Cache the response
await redis.set(cache_key, json.dumps(response), ex=expire_seconds)
return response
return wrapper
return decorator
This decorator is a bit more complex, but it allows us to cache the response of any async endpoint. Here's how to use it:
@app.on_event("startup")
async def startup_event():
app.state.redis = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
@app.on_event("shutdown")
async def shutdown_event():
await app.state.redis.close()
@app.get("/users/{user_id}")
@cache_response(expire_seconds=30)
async def get_user(request: Request, user_id: int):
# Simulate slow database query
time.sleep(2)
return {"user_id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}
Implementing Rate Limiting
Redis is perfect for implementing rate limiting to prevent abuse of your API.
from fastapi import Header, Request, HTTPException
@app.get("/limited-endpoint")
async def limited_endpoint(
request: Request,
user_agent: str = Header(...),
redis = Depends(get_redis)
):
# Create a key using the client's IP and user agent
client_key = f"ratelimit:{request.client.host}:{user_agent}"
# Get current count for this client
count = await redis.incr(client_key)
# Set expiry if it's the first request
if count == 1:
await redis.expire(client_key, 60) # Reset count after 60 seconds
# Check if limit exceeded
if count > 10:
raise HTTPException(
status_code=429,
detail="Too many requests. Please try again later."
)
return {
"message": "This is a rate-limited endpoint",
"requests_remaining": 10 - count
}
This endpoint allows only 10 requests per minute per client (based on IP address and user agent).
Session Management with Redis
Redis is excellent for session management. Let's create a simple session system:
import uuid
from fastapi import Cookie, Response
from typing import Optional
@app.post("/login")
async def login(username: str, response: Response, redis = Depends(get_redis)):
# Generate a session ID
session_id = str(uuid.uuid4())
# Store user data in Redis with the session ID as key
await redis.hset(f"session:{session_id}", mapping={
"username": username,
"login_time": time.time()
})
# Set session expiration (1 hour)
await redis.expire(f"session:{session_id}", 3600)
# Set cookie in response
response.set_cookie(key="session_id", value=session_id, httponly=True)
return {"message": "Login successful"}
@app.get("/profile")
async def get_profile(
session_id: Optional[str] = Cookie(None),
redis = Depends(get_redis)
):
if not session_id:
raise HTTPException(status_code=401, detail="Not authenticated")
# Get user data from Redis
user_data = await redis.hgetall(f"session:{session_id}")
if not user_data:
raise HTTPException(status_code=401, detail="Invalid or expired session")
return {
"username": user_data.get("username"),
"logged_in_since": float(user_data.get("login_time", 0))
}
@app.post("/logout")
async def logout(
response: Response,
session_id: Optional[str] = Cookie(None),
redis = Depends(get_redis)
):
if session_id:
# Delete session from Redis
await redis.delete(f"session:{session_id}")
# Clear cookie
response.delete_cookie(key="session_id")
return {"message": "Logout successful"}
This implementation provides a basic but functional session management system using Redis.
Building a Real-Time Notification System
Redis pub/sub mechanism makes it perfect for real-time applications. Let's build a simple notification system:
import asyncio
from fastapi import WebSocket, WebSocketDisconnect
from asyncio import Queue
from contextlib import asynccontextmanager
# Store active websockets in memory
active_connections = {}
# Subscribe to Redis channel
@asynccontextmanager
async def redis_subscriber():
redis = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
pubsub = redis.pubsub()
await pubsub.subscribe("notifications")
try:
yield pubsub
finally:
await pubsub.unsubscribe("notifications")
await redis.close()
# Background task to listen for messages
@app.on_event("startup")
async def start_pubsub_listener():
asyncio.create_task(listen_to_redis())
async def listen_to_redis():
async with redis_subscriber() as pubsub:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
data = message.get("data")
# Broadcast to all connected clients
for user_id, queue in active_connections.items():
await queue.put(data)
await asyncio.sleep(0.01) # Small sleep to avoid hogging CPU
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await websocket.accept()
# Create a queue for this connection
queue = Queue()
active_connections[user_id] = queue
try:
# Welcome message
await websocket.send_text(f"Connected as user {user_id}")
# Keep connection open and watch for messages
while True:
# Check for new messages from Redis
try:
data = await asyncio.wait_for(queue.get(), timeout=1.0)
await websocket.send_text(data)
except asyncio.TimeoutError:
# No message, send ping to keep connection alive
await websocket.send_text("ping")
# Check for incoming messages from client
try:
data = await asyncio.wait_for(websocket.receive_text(), timeout=0.1)
# Do something with received data if needed
print(f"Received from {user_id}: {data}")
except (asyncio.TimeoutError, WebSocketDisconnect):
pass
except WebSocketDisconnect:
# Clean up on disconnect
del active_connections[user_id]
@app.post("/send-notification")
async def send_notification(message: str, redis = Depends(get_redis)):
# Publish message to Redis channel
await redis.publish("notifications", message)
return {"status": "Message sent"}
This system:
- Creates a WebSocket endpoint for clients to connect to
- Subscribes to a Redis channel for notifications
- When a notification is received, it's sent to all connected clients
- Provides an API endpoint to send notifications
Complete Example: E-commerce Product Cache
Let's put together a more complete example for an e-commerce site that uses Redis for product caching:
from fastapi import FastAPI, Depends, HTTPException, Path, Query
import aioredis
import json
import time
import asyncio
from typing import Optional, List, Dict, Any
app = FastAPI(title="FastAPI Redis E-Commerce Example")
# Redis connection dependency
async def get_redis():
redis = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
try:
yield redis
finally:
await redis.close()
# Simulated database (in a real app, this would be a real database)
products_db = {
1: {"id": 1, "name": "Laptop", "price": 999.99, "stock": 50},
2: {"id": 2, "name": "Smartphone", "price": 499.99, "stock": 100},
3: {"id": 3, "name": "Headphones", "price": 149.99, "stock": 75},
}
# Function to simulate database delay
async def get_from_db(product_id: int) -> Dict[str, Any]:
await asyncio.sleep(1) # Simulate database query delay
if product_id not in products_db:
return None
return products_db[product_id]
@app.get("/products/{product_id}")
async def get_product(product_id: int = Path(...), redis = Depends(get_redis)):
# Try to get from cache first
cache_key = f"product:{product_id}"
cached_product = await redis.get(cache_key)
if cached_product:
print(f"Cache hit for product {product_id}")
return json.loads(cached_product)
print(f"Cache miss for product {product_id}, fetching from DB")
product = await get_from_db(product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Cache the product with a 5 minute expiration
await redis.set(cache_key, json.dumps(product), ex=300)
return product
@app.get("/products")
async def list_products(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
redis = Depends(get_redis)
):
# Try to get from cache first
cache_key = f"products:list:{page}:{per_page}"
cached_products = await redis.get(cache_key)
if cached_products:
print(f"Cache hit for product list page {page}")
return json.loads(cached_products)
print(f"Cache miss for product list page {page}, fetching from DB")
# Simulate fetching from database with pagination
await asyncio.sleep(1) # Simulate database delay
# Calculate pagination
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
# Get products for this page
product_ids = list(products_db.keys())[start_idx:end_idx]
products = [products_db[pid] for pid in product_ids]
result = {
"page": page,
"per_page": per_page,
"total": len(products_db),
"products": products
}
# Cache result with a 2 minute expiration
await redis.set(cache_key, json.dumps(result), ex=120)
return result
@app.post("/products/{product_id}/update")
async def update_product(
product_id: int,
name: Optional[str] = None,
price: Optional[float] = None,
stock: Optional[int] = None,
redis = Depends(get_redis)
):
# Check if product exists
if product_id not in products_db:
raise HTTPException(status_code=404, detail="Product not found")
# Update product in "database"
product = products_db[product_id]
if name is not None:
product["name"] = name
if price is not None:
product["price"] = price
if stock is not None:
product["stock"] = stock
# Invalidate cache
await redis.delete(f"product:{product_id}")
# Invalidate product list caches
cache_keys = await redis.keys("products:list:*")
if cache_keys:
await redis.delete(*cache_keys)
return {"message": "Product updated", "product": product}
@app.on_event("startup")
async def startup_event():
app.state.redis = await aioredis.from_url("redis://localhost", encoding="utf-8", decode_responses=True)
# Prepopulate cache with products at startup
for product_id, product in products_db.items():
await app.state.redis.set(f"product:{product_id}", json.dumps(product), ex=300)
print("Initial product cache populated")
@app.on_event("shutdown")
async def shutdown_event():
await app.state.redis.close()
This example demonstrates:
- Caching individual product details
- Caching paginated product listings
- Cache invalidation when products are updated
- Preloading cache on application startup
Summary
In this tutorial, we explored how to integrate Redis with FastAPI applications. We covered:
- Setting up Redis connections in FastAPI
- Implementing response caching for improved performance
- Building a rate limiting system to protect your API
- Creating a session management system with Redis
- Developing a real-time notification system using Redis pub/sub
- Building a complete e-commerce product caching solution
Redis and FastAPI make a powerful combination for building high-performance web applications. Redis helps solve common web development challenges like caching, session management, and real-time features, while FastAPI provides a robust, type-safe framework for building APIs.
Additional Resources
- Redis Documentation
- aioredis Documentation
- FastAPI Documentation
- Redis University - Free Redis courses
Exercises
-
Basic: Modify the product caching example to include category filtering and ensure proper cache invalidation.
-
Intermediate: Implement a distributed locking mechanism using Redis to prevent race conditions when multiple users try to update the same resource simultaneously.
-
Advanced: Create a job queue system using Redis Lists where FastAPI endpoints can add background jobs that are processed by worker processes.
-
Challenge: Implement a user analytics system that tracks page views and user activities using Redis Sorted Sets to maintain real-time leaderboards of popular products.
By integrating Redis with your FastAPI applications, you can significantly improve performance, scalability, and unlock powerful features like real-time notifications and distributed caching. The in-memory nature of Redis makes it perfect for situations where low latency is critical.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)