FastAPI WebSocket Connections
WebSockets provide a powerful way to create interactive, real-time applications. In this guide, we'll explore how FastAPI handles WebSocket connections and how you can use them to build responsive, two-way communication systems.
Introduction to WebSocket Connections in FastAPI
WebSockets establish a persistent connection between a client (usually a web browser) and a server, allowing for continuous two-way communication. Unlike traditional HTTP requests, which follow a request-response pattern, WebSockets maintain an open connection, enabling instant data exchange without repeated connection overhead.
FastAPI makes it easy to implement WebSocket connections with its intuitive API, building on top of Python's async capabilities to handle concurrent connections efficiently.
Basic WebSocket Connection
Let's start with a simple example of establishing a WebSocket connection in FastAPI:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# Accept the connection
await websocket.accept()
try:
# Infinite loop to keep the connection alive
while True:
# Receive message from client
data = await websocket.receive_text()
# Echo the message back to client
await websocket.send_text(f"Message received: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
# Handle disconnection
print("WebSocket connection closed")
In this example:
- We define a WebSocket endpoint at
/ws
- We accept the connection when a client connects
- We enter a loop that receives messages and echoes them back
- We handle any exceptions and disconnection
Client-Side Connection
To connect to this WebSocket server from a web browser, you can use JavaScript:
// Create a WebSocket connection
const socket = new WebSocket("ws://localhost:8000/ws");
// Connection opened
socket.addEventListener("open", (event) => {
console.log("Connection established");
socket.send("Hello Server!");
});
// Listen for messages
socket.addEventListener("message", (event) => {
console.log("Message from server:", event.data);
});
// Connection closed
socket.addEventListener("close", (event) => {
console.log("Connection closed");
});
Managing WebSocket Connections
In real-world applications, you'll typically need to manage multiple WebSocket connections. FastAPI can help with this using a connection manager:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
app = FastAPI()
class ConnectionManager:
def __init__(self):
# Store active connections
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
# Accept connection
await websocket.accept()
# Store connection with its ID
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str):
# Remove connection when client disconnects
if client_id in self.active_connections:
del self.active_connections[client_id]
async def send_personal_message(self, message: str, client_id: str):
# Send message to specific client
if client_id in self.active_connections:
await self.active_connections[client_id].send_text(message)
async def broadcast(self, message: str, exclude: str = None):
# Send message to all connected clients (except excluded)
for client_id, connection in self.active_connections.items():
if client_id != exclude:
await connection.send_text(message)
# Create a connection manager instance
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
# Connect to the manager
await manager.connect(websocket, client_id)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Send personal message
await manager.send_personal_message(f"You sent: {data}", client_id)
# Broadcast to others
await manager.broadcast(f"Client {client_id} says: {data}", exclude=client_id)
except WebSocketDisconnect:
# Handle disconnection
manager.disconnect(client_id)
# Announce disconnection
await manager.broadcast(f"Client #{client_id} left the chat")
This connection manager provides:
- Client identification with unique IDs
- Personal messaging to specific clients
- Broadcasting to all connected clients
- Proper handling of disconnections
Advanced WebSocket Functionality
Connection Lifecycle Events
FastAPI supports handling different stages of the WebSocket connection lifecycle:
@app.websocket("/ws/lifecycle")
async def websocket_lifecycle(websocket: WebSocket):
# Connection request handling
print("Connection request received")
await websocket.accept()
print("Connection accepted")
try:
while True:
# You can handle different message types
message_type = await websocket.receive()
if "text" in message_type:
# Handle text message
text = message_type["text"]
await websocket.send_text(f"You sent: {text}")
elif "bytes" in message_type:
# Handle binary message
binary = message_type["bytes"]
await websocket.send_bytes(binary)
elif "json" in message_type:
# Handle JSON message
json_data = message_type["json"]
await websocket.send_json({"received": json_data})
except WebSocketDisconnect as e:
print(f"Client disconnected with code: {e.code}")
finally:
# Clean-up code
print("Connection closed")
WebSocket Authorization
You can implement authorization for WebSocket connections using dependencies:
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
API_KEY_NAME = "X-API-Key"
API_KEY = "your-secret-api-key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=True)
async def get_api_key(api_key: str = Depends(api_key_header)):
if api_key != API_KEY:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
)
return api_key
@app.websocket("/ws/secure")
async def secure_websocket(
websocket: WebSocket,
api_key: str = Depends(get_api_key)
):
await websocket.accept()
await websocket.send_text("Successfully connected to secure WebSocket!")
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Secure echo: {data}")
except WebSocketDisconnect:
print("Secure WebSocket disconnected")
Real-World Example: Chat Application
Let's build a simple chat application that demonstrates a practical use case for WebSockets:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from typing import List, Dict
import json
app = FastAPI()
templates = Jinja2Templates(directory="templates")
class ChatConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.user_names: Dict[str, str] = {}
async def connect(self, websocket: WebSocket, user_id: str, username: str):
await websocket.accept()
self.active_connections[user_id] = websocket
self.user_names[user_id] = username
# Send connected users list
users = list(self.user_names.values())
await self.broadcast_json({
"type": "users_update",
"users": users
})
# Send welcome message
await self.broadcast_json({
"type": "chat_message",
"user": "System",
"message": f"{username} has joined the chat"
})
def disconnect(self, user_id: str):
username = self.user_names.get(user_id, "Unknown")
if user_id in self.active_connections:
del self.active_connections[user_id]
if user_id in self.user_names:
del self.user_names[user_id]
return username
async def broadcast_json(self, message_dict: dict):
for connection in self.active_connections.values():
await connection.send_json(message_dict)
chat_manager = ChatConnectionManager()
@app.get("/", response_class=HTMLResponse)
async def get(request: Request):
return templates.TemplateResponse("chat.html", {"request": request})
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat_endpoint(websocket: WebSocket, user_id: str):
# Wait for the initial connection message with username
await websocket.accept()
initial_data = await websocket.receive_text()
user_data = json.loads(initial_data)
username = user_data.get("username", f"User-{user_id}")
# Register the connection
await chat_manager.connect(websocket, user_id, username)
try:
while True:
# Receive message
data = await websocket.receive_text()
message_data = json.loads(data)
# Broadcast the message
await chat_manager.broadcast_json({
"type": "chat_message",
"user": username,
"message": message_data.get("message", "")
})
except WebSocketDisconnect:
# Handle disconnection
username = chat_manager.disconnect(user_id)
# Announce disconnection
await chat_manager.broadcast_json({
"type": "chat_message",
"user": "System",
"message": f"{username} has left the chat"
})
# Update users list
users = list(chat_manager.user_names.values())
await chat_manager.broadcast_json({
"type": "users_update",
"users": users
})
Here's a simplified HTML template for the chat application (chat.html
):
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>FastAPI Chat</title>
<style>
#chat-container { width: 800px; margin: 0 auto; }
#chat-messages { height: 400px; border: 1px solid #ccc; overflow-y: scroll; padding: 10px; margin-bottom: 10px; }
#user-list { float: right; width: 150px; border: 1px solid #ccc; padding: 10px; min-height: 200px; }
#message-input { width: 80%; padding: 5px; }
.system-message { color: #888; font-style: italic; }
.user-message strong { color: #0066cc; }
</style>
</head>
<body>
<div id="chat-container">
<h1>FastAPI WebSocket Chat</h1>
<div id="user-list">
<h3>Online Users</h3>
<ul id="users"></ul>
</div>
<div id="chat-messages"></div>
<div>
<input id="message-input" type="text" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
</div>
</div>
<script>
// Generate a random user ID
const userId = 'user-' + Math.floor(Math.random() * 1000000);
let username = prompt("Enter your username:") || "Anonymous";
let socket;
function connectWebSocket() {
// Create WebSocket connection
socket = new WebSocket(`ws://localhost:8000/ws/chat/${userId}`);
// Connection opened
socket.addEventListener('open', function(event) {
console.log('Connected to WebSocket server');
// Send initial message with username
socket.send(JSON.stringify({
type: 'connect',
username: username
}));
});
// Listen for messages
socket.addEventListener('message', function(event) {
const data = JSON.parse(event.data);
if (data.type === 'chat_message') {
displayMessage(data.user, data.message);
} else if (data.type === 'users_update') {
updateUsersList(data.users);
}
});
// Connection closed
socket.addEventListener('close', function(event) {
console.log('Disconnected from server');
// Try to reconnect after a delay
setTimeout(connectWebSocket, 3000);
});
// Connection error
socket.addEventListener('error', function(event) {
console.error('WebSocket Error:', event);
});
}
function sendMessage() {
const messageInput = document.getElementById('message-input');
const message = messageInput.value.trim();
if (message && socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({
type: 'message',
message: message
}));
messageInput.value = '';
}
}
function displayMessage(user, message) {
const messagesDiv = document.getElementById('chat-messages');
const messageElement = document.createElement('p');
if (user === 'System') {
messageElement.className = 'system-message';
messageElement.innerHTML = ``;
} else {
messageElement.className = 'user-message';
messageElement.innerHTML = ``;
}
messagesDiv.appendChild(messageElement);
messagesDiv.scrollTop = messagesDiv.scrollHeight;
}
function updateUsersList(users) {
const usersList = document.getElementById('users');
usersList.innerHTML = '';
users.forEach(user => {
const userItem = document.createElement('li');
userItem.textContent = user;
usersList.appendChild(userItem);
});
}
// Enter key to send message
document.getElementById('message-input').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
// Start connection
connectWebSocket();
</script>
</body>
</html>
This chat application demonstrates:
- WebSocket connection management
- Handling multiple clients
- Broadcasting messages
- Maintaining a list of connected users
- Handling disconnections gracefully
- A simple client-side implementation
Connection Lifecycle Management
Understanding the WebSocket connection lifecycle is essential for building robust applications:
- Initialization: Client sends a WebSocket upgrade request
- Handshake: Server accepts the connection with
await websocket.accept()
- Data Exchange: Client and server freely exchange messages
- Termination: Either side can close the connection
FastAPI's WebSocket support allows you to handle each stage with:
await websocket.accept()
- Accept the connectionawait websocket.receive_text()
,await websocket.receive_json()
, etc. - Receive messagesawait websocket.send_text()
,await websocket.send_json()
, etc. - Send messages- WebSocket connection closing is handled automatically when the function exits
Summary
In this guide, we've covered:
- Basic WebSocket connections in FastAPI
- Managing multiple WebSocket connections
- Sending and receiving different types of messages
- Implementing authorization for WebSockets
- Building a real-world chat application
WebSockets in FastAPI provide a powerful way to build real-time, interactive applications with bidirectional communication. By leveraging FastAPI's async support, you can handle numerous concurrent connections efficiently.
Additional Resources and Exercises
Resources
Exercises
-
Build a Real-Time Dashboard: Create a dashboard that displays system statistics (CPU usage, memory, etc.) updated in real-time through WebSockets.
-
Collaborative Drawing App: Implement a simple canvas where multiple users can draw simultaneously, with changes broadcasted to all connected clients.
-
WebSocket Authentication: Modify the chat example to require authentication using JWT tokens before allowing WebSocket connections.
-
Reconnection Logic: Enhance the client-side code to intelligently handle disconnections and attempt reconnection with exponential backoff.
-
Message History: Add functionality to store recent messages and send them to new users when they connect to the chat.
By mastering WebSocket connections in FastAPI, you'll be able to build sophisticated real-time applications that provide interactive and responsive user experiences.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)