FastAPI WebSocket Messages
Introduction
WebSockets in FastAPI provide a powerful way to establish bidirectional communication channels between clients and servers. In this guide, we'll focus specifically on how to send and receive different types of messages through WebSockets. Understanding message handling is crucial for building real-time applications that require instant data exchange.
WebSocket messages can be text-based (like JSON or plain strings) or binary data (like images or files). FastAPI provides convenient methods for handling both types, making it easy to create robust real-time applications.
Basic Message Types
WebSockets support two fundamental message types:
- Text messages - Used for sending strings, JSON objects, or other text-based data
- Binary messages - Used for sending raw binary data like files, images, or other non-text content
Let's explore how to work with each type in FastAPI.
Sending and Receiving Text Messages
Text messages are the most common type of data exchanged over WebSockets. Here's how to implement a simple echo server that receives and sends back text messages:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
# HTML client for testing the WebSocket
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<input type="text" id="messageText" placeholder="Type a message">
<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
function sendMessage() {
var messageInput = document.getElementById('messageText');
ws.send(messageInput.value);
messageInput.value = '';
}
</script>
</body>
</html>
"""
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive text message from the client
data = await websocket.receive_text()
# Echo the message back to the client
await websocket.send_text(f"Echo: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
How It Works
- We create a route at
/ws
for the WebSocket connection - We use
websocket.accept()
to establish the connection - In a loop, we use
websocket.receive_text()
to wait for and receive text messages - We then echo the message back to the client using
websocket.send_text()
- We handle exceptions and ensure the connection is closed properly in the
finally
block
Working with JSON Messages
JSON is a common format for structured data exchange. FastAPI makes it easy to work with JSON over WebSockets:
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws/json")
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive text message
data_str = await websocket.receive_text()
# Parse JSON
try:
data = json.loads(data_str)
# Add a response field
response_data = {
"original_message": data,
"response": f"Server received your JSON with {len(data)} fields"
}
# Send JSON response
await websocket.send_json(response_data)
except json.JSONDecodeError:
await websocket.send_text("Error: Invalid JSON format")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
How It Works
- We receive text data using
websocket.receive_text()
- We parse the received text as JSON using
json.loads()
- We create a response JSON object
- We send the response using
websocket.send_json()
, which handles JSON serialization for us - We handle JSON parsing errors gracefully
Handling Binary Messages
For applications that need to transfer binary data (like images or files), FastAPI supports binary WebSocket messages:
from fastapi import FastAPI, WebSocket
import base64
app = FastAPI()
@app.websocket("/ws/binary")
async def websocket_binary_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive binary data
data = await websocket.receive_bytes()
# Process the binary data (example: get size and convert to base64)
size = len(data)
base64_data = base64.b64encode(data).decode("utf-8")
# Send info about the binary data as text
await websocket.send_text(
f"Received binary data of size {size} bytes. "
f"First 20 bytes in base64: {base64_data[:20]}..."
)
# Echo the binary data back
await websocket.send_bytes(data)
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
How It Works
- We use
websocket.receive_bytes()
to receive binary data - We process the binary data (in this example, we get its size and encode part of it as base64)
- We send back information about the binary data as text
- We also echo back the original binary data using
websocket.send_bytes()
Mixed Message Types
In real applications, you might need to handle different types of messages through the same WebSocket connection. This can be done using the generic receive()
method:
from fastapi import FastAPI, WebSocket
import json
app = FastAPI()
@app.websocket("/ws/mixed")
async def websocket_mixed_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive message generically
message = await websocket.receive()
# Check the message type
if "text" in message:
# Handle text message
text_data = message["text"]
await websocket.send_text(f"You sent a text message: {text_data}")
elif "bytes" in message:
# Handle binary message
binary_data = message["bytes"]
size = len(binary_data)
await websocket.send_text(f"You sent a binary message of size {size} bytes")
else:
# Unknown message type
await websocket.send_text("Received unknown message type")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
How It Works
- We use the generic
websocket.receive()
method that returns a dictionary - We check if the dictionary contains a "text" or "bytes" key to determine the type of message
- We handle each type of message accordingly
Real-World Example: Real-time Chat Application
Now let's build a more practical example: a simple chat room application where users can join and send messages to everyone else in the room.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List, Dict
import json
app = FastAPI()
# HTML client for the chat application
html = """
<!DOCTYPE html>
<html>
<head>
<title>FastAPI Chat</title>
<style>
body {
max-width: 600px;
margin: 0 auto;
padding: 20px;
font-family: Arial, sans-serif;
}
#messages {
list-style-type: none;
padding: 0;
border: 1px solid #ddd;
height: 300px;
overflow-y: scroll;
padding: 10px;
margin-bottom: 10px;
}
#messageForm {
display: flex;
}
#messageText {
flex-grow: 1;
padding: 10px;
margin-right: 10px;
}
.system { color: #888; }
.user { color: blue; }
.me { color: green; }
</style>
</head>
<body>
<h1>FastAPI Chat Room</h1>
<div>
<label for="username">Your name:</label>
<input type="text" id="username" placeholder="Enter your name">
<button onclick="connect()">Join Chat</button>
</div>
<ul id="messages"></ul>
<form id="messageForm" style="display: none;" onsubmit="sendMessage(event)">
<input type="text" id="messageText" placeholder="Type a message">
<button type="submit">Send</button>
</form>
<script>
let ws = null;
function connect() {
const username = document.getElementById('username').value;
if (!username) {
alert("Please enter a name first!");
return;
}
if (ws) {
ws.close();
}
// Connect to WebSocket with username as a query parameter
ws = new WebSocket(`ws://localhost:8000/ws/chat?username=${username}`);
// Show the message form
document.getElementById('messageForm').style.display = "flex";
// Handle incoming messages
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('li');
// Parse the JSON message
const data = JSON.parse(event.data);
// Apply different styles based on message type
if (data.type === 'system') {
message.className = 'system';
} else if (data.sender === username) {
message.className = 'me';
} else {
message.className = 'user';
}
message.textContent = data.message;
messages.appendChild(message);
messages.scrollTop = messages.scrollHeight;
};
ws.onclose = function(event) {
document.getElementById('messages').innerHTML += '<li class="system">Connection closed</li>';
document.getElementById('messageForm').style.display = "none";
};
}
function sendMessage(event) {
event.preventDefault();
const messageInput = document.getElementById('messageText');
const message = messageInput.value;
if (message && ws) {
ws.send(JSON.stringify({
message: message
}));
messageInput.value = '';
}
}
</script>
</body>
</html>
"""
# Connection manager to keep track of active connections
class ConnectionManager:
def __init__(self):
# Active connections
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, username: str):
await websocket.accept()
self.active_connections[username] = websocket
def disconnect(self, username: str):
if username in self.active_connections:
del self.active_connections[username]
async def send_personal_message(self, message: str, type: str, username: str):
if username in self.active_connections:
payload = {
"message": message,
"sender": "Server",
"type": type
}
await self.active_connections[username].send_json(payload)
async def broadcast(self, message: str, sender: str, type: str = "message"):
payload = {
"message": message,
"sender": sender,
"type": type
}
for username, connection in list(self.active_connections.items()):
await connection.send_json(payload)
manager = ConnectionManager()
@app.get("/")
async def get():
return HTMLResponse(html)
@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket, username: str = "Anonymous"):
await manager.connect(websocket, username)
await manager.broadcast(f"{username} has joined the chat", "Server", "system")
try:
while True:
# Receive JSON data
data_str = await websocket.receive_text()
data = json.loads(data_str)
# Broadcast the message to all connected clients
await manager.broadcast(f"{username}: {data['message']}", username)
except WebSocketDisconnect:
# Handle client disconnection
manager.disconnect(username)
await manager.broadcast(f"{username} has left the chat", "Server", "system")
except Exception as e:
print(f"Error: {e}")
manager.disconnect(username)
How the Chat Application Works
- We create a
ConnectionManager
class to track active WebSocket connections - Each client connects with a username as a query parameter
- Messages are sent as JSON objects and broadcast to all connected clients
- We handle disconnections gracefully
- The frontend displays messages differently based on their type (system message or user message)
Advanced Message Handling: Compression and Large Files
For advanced use cases like sending large files or compressed data, you can combine WebSockets with binary data processing:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import zlib
import io
app = FastAPI()
@app.websocket("/ws/compressed")
async def websocket_compressed(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive compressed binary data
compressed_data = await websocket.receive_bytes()
# Decompress the data
try:
decompressed_data = zlib.decompress(compressed_data)
decompressed_size = len(decompressed_data)
# Send acknowledgment
await websocket.send_json({
"status": "success",
"message": f"Received and decompressed {len(compressed_data)} bytes into {decompressed_size} bytes",
"compression_ratio": f"{len(compressed_data) / decompressed_size:.2f}"
})
except zlib.error:
await websocket.send_json({
"status": "error",
"message": "Failed to decompress data"
})
except WebSocketDisconnect:
print("Client disconnected")
except Exception as e:
print(f"Error: {e}")
Summary
In this guide, we've covered how to send and receive different types of messages using FastAPI WebSockets:
- Text messages with
receive_text()
andsend_text()
- JSON messages with
receive_text()
+ JSON parsing andsend_json()
- Binary data with
receive_bytes()
andsend_bytes()
- Mixed message types with the generic
receive()
method - Real-world application: Building a chat room
- Advanced handling: Compression and large files
By understanding these different message types and how to handle them, you can build sophisticated real-time applications using FastAPI WebSockets.
Further Learning and Exercises
To practice and enhance your understanding of FastAPI WebSocket messages, try these exercises:
-
File Transfer Application: Create a WebSocket endpoint that allows users to upload and download files.
-
Real-time Dashboard: Build a dashboard that streams random data every second and displays it as a chart on the frontend.
-
Multi-room Chat: Extend the chat example to support multiple chat rooms.
-
Message Encryption: Implement end-to-end encryption for WebSocket messages.
-
Protocol Implementation: Implement a simple custom protocol over WebSockets (e.g., a protocol that includes message type, sequence number, and payload).
Additional Resources
- FastAPI Official Documentation on WebSockets
- MDN WebSocket API Documentation
- Socket.IO - A library that can be used alongside FastAPI for more complex real-time applications
- HTML5 WebSocket - For understanding the client-side aspect
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)