FastAPI WebSockets Introduction
What are WebSockets?
WebSockets provide a persistent connection between a client and server that both parties can use to start sending data at any time. Unlike the traditional HTTP request-response model, WebSockets enable real-time communication where both the client and server can send messages independently without needing to establish a new connection each time.
Traditional HTTP connections:
- Client makes a request
- Server sends a response
- Connection closes
WebSocket connections:
- Client makes an initial handshake request
- Connection is upgraded from HTTP to WebSocket
- Connection remains open for continuous two-way communication
- Either party can send messages at any time
Why Use WebSockets with FastAPI?
FastAPI's asynchronous nature makes it perfect for implementing WebSockets:
- Real-time applications: Chat applications, live dashboards, collaborative tools
- Reduced latency: No need to establish new connections for each message
- Server-initiated updates: Server can push updates to clients without polling
- Efficient resource usage: Fewer connections needed compared to HTTP polling
Basic WebSocket Implementation in FastAPI
Let's start with a simple WebSocket example. First, ensure you have the necessary dependencies:
pip install fastapi uvicorn websockets
Now, let's create a basic echo server that receives messages and sends them back:
from fastapi import FastAPI, WebSocket
import uvicorn
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Let's break down this code:
- We import the necessary modules:
FastAPI
andWebSocket
- We create a FastAPI application instance
- We define an endpoint with the
@app.websocket
decorator - Inside the endpoint function:
- We accept the WebSocket connection with
await websocket.accept()
- We enter a loop that continuously receives and sends messages
- We handle any exceptions and ensure the connection closes properly
- We accept the WebSocket connection with
Testing Your WebSocket
You can test your WebSocket implementation using a variety of tools:
Browser-based Testing
Create a simple HTML file named websocket_test.html
:
<!DOCTYPE html>
<html>
<head>
<title>FastAPI WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id="messages">
</ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
function sendMessage(event) {
var input = document.getElementById("messageText");
ws.send(input.value);
input.value = '';
event.preventDefault();
}
</script>
</body>
</html>
Open this HTML file in your browser while your FastAPI application is running, and you'll be able to send messages and see the responses.
WebSocket Connection States
WebSocket connections in FastAPI can be in several states:
- Connecting: Initial state when the connection request is made
- Connected: After
websocket.accept()
is called - Disconnected: After
websocket.close()
or when an error occurs
It's important to properly manage these states to avoid resources remaining allocated when connections are no longer active.
Handling Multiple Clients
For most real-world applications, you'll need to handle multiple WebSocket connections simultaneously. Let's implement a simple chat room:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# Send message to the sender
await manager.send_personal_message(f"You: {data}", websocket)
# Broadcast message to all other connected clients
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
In this example:
- We create a
ConnectionManager
class to handle multiple WebSocket connections - The manager keeps track of active connections and provides methods to send messages
- When a client connects, we add them to the list of active connections
- When a client disconnects, we remove them from the list
- We can send personal messages to specific clients or broadcast to all connected clients
Sending Different Types of Data
WebSockets can transmit different types of data:
Text Data
# Sending text
await websocket.send_text("Hello, client!")
# Receiving text
text_data = await websocket.receive_text()
JSON Data
# Sending JSON
await websocket.send_json({"message": "Hello", "user": "system"})
# Receiving JSON
json_data = await websocket.receive_json()
Binary Data
# Sending binary data
await websocket.send_bytes(b"Binary data here")
# Receiving binary data
binary_data = await websocket.receive_bytes()
Real-world Example: Live Dashboard
Let's create a more practical example - a simple live dashboard that sends periodic updates:
import asyncio
import random
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
class DashboardManager:
def __init__(self):
self.active_connections = []
self.generator_task = None
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
if self.generator_task is None:
self.generator_task = asyncio.create_task(self.data_generator())
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
if len(self.active_connections) == 0 and self.generator_task:
self.generator_task.cancel()
self.generator_task = None
async def data_generator(self):
while True:
# Generate random metrics
data = {
"cpu_usage": random.randint(0, 100),
"memory_usage": random.randint(0, 100),
"active_users": random.randint(10, 1000),
"response_time": round(random.uniform(0.1, 2.0), 2)
}
# Send to all connected clients
for connection in self.active_connections:
await connection.send_json(data)
# Wait before sending the next update
await asyncio.sleep(2)
dashboard = DashboardManager()
@app.websocket("/dashboard-feed")
async def dashboard_endpoint(websocket: WebSocket):
await dashboard.connect(websocket)
try:
while True:
# Just keep the connection alive and wait for data
data = await websocket.receive_text()
except WebSocketDisconnect:
dashboard.disconnect(websocket)
This example:
- Creates a persistent WebSocket connection for dashboard clients
- Generates random system metrics every 2 seconds
- Broadcasts the metrics to all connected clients
- Efficiently manages resources by only generating data when clients are connected
You could create a simple frontend to display this data:
<!DOCTYPE html>
<html>
<head>
<title>Live Dashboard</title>
<style>
.metric {
padding: 10px;
margin: 5px;
border: 1px solid #ddd;
border-radius: 5px;
display: inline-block;
width: 150px;
text-align: center;
}
.value {
font-size: 24px;
font-weight: bold;
}
</style>
</head>
<body>
<h1>System Dashboard</h1>
<div id="dashboard">
<div class="metric">
<div>CPU Usage</div>
<div id="cpu" class="value">0%</div>
</div>
<div class="metric">
<div>Memory Usage</div>
<div id="memory" class="value">0%</div>
</div>
<div class="metric">
<div>Active Users</div>
<div id="users" class="value">0</div>
</div>
<div class="metric">
<div>Response Time</div>
<div id="response" class="value">0 ms</div>
</div>
</div>
<script>
const ws = new WebSocket("ws://localhost:8000/dashboard-feed");
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
document.getElementById('cpu').textContent = data.cpu_usage + '%';
document.getElementById('memory').textContent = data.memory_usage + '%';
document.getElementById('users').textContent = data.active_users;
document.getElementById('response').textContent = data.response_time + ' ms';
};
ws.onclose = function(event) {
console.log("Connection closed");
};
</script>
</body>
</html>
Best Practices for WebSockets in FastAPI
- Error Handling: Always implement proper error handling to prevent your application from crashing.
try:
# WebSocket operations
except WebSocketDisconnect:
# Handle normal disconnection
except Exception as e:
# Handle unexpected errors
logger.error(f"WebSocket error: {e}")
finally:
# Clean up resources
- Authentication: Secure your WebSockets:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
# Verify the token from the query parameters
token = websocket.query_params.get("token")
if not is_valid_token(token):
await websocket.close(code=1008) # Policy violation
return
await websocket.accept()
# Proceed with authenticated connection
-
Rate limiting: Prevent abuse by implementing rate limiting.
-
Connection timeouts: Implement heartbeats to detect and clean up stale connections.
-
Graceful degradation: Provide fallback mechanisms when WebSockets aren't supported.
Summary
WebSockets in FastAPI enable real-time bidirectional communication between clients and servers. They're perfect for applications requiring live updates, chat functionality, collaborative editing, and more. FastAPI's asynchronous foundation makes it particularly well-suited for efficient WebSocket implementation.
In this introduction, we've covered:
- The basics of WebSocket connections
- Simple echo server implementation
- Managing multiple client connections
- Handling various data types
- A practical dashboard example
- Best practices for production use
Further Resources
Exercises
- Modify the chat example to display a list of active users.
- Create a collaborative drawing application where multiple users can draw on the same canvas.
- Implement a notification system that sends real-time alerts to admin users.
- Add secure authentication to the WebSocket connection using JWT tokens.
- Build a real-time multiplayer game using WebSockets for communication.
Happy coding with FastAPI WebSockets!
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)