Skip to main content

FastAPI WebSocket Messages

Introduction

WebSockets in FastAPI provide a powerful way to establish bidirectional communication channels between clients and servers. In this guide, we'll focus specifically on how to send and receive different types of messages through WebSockets. Understanding message handling is crucial for building real-time applications that require instant data exchange.

WebSocket messages can be text-based (like JSON or plain strings) or binary data (like images or files). FastAPI provides convenient methods for handling both types, making it easy to create robust real-time applications.

Basic Message Types

WebSockets support two fundamental message types:

  1. Text messages - Used for sending strings, JSON objects, or other text-based data
  2. Binary messages - Used for sending raw binary data like files, images, or other non-text content

Let's explore how to work with each type in FastAPI.

Sending and Receiving Text Messages

Text messages are the most common type of data exchanged over WebSockets. Here's how to implement a simple echo server that receives and sends back text messages:

python
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

# HTML client for testing the WebSocket
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<input type="text" id="messageText" placeholder="Type a message">
<button onclick="sendMessage()">Send</button>
<ul id="messages"></ul>
<script>
var ws = new WebSocket("ws://localhost:8000/ws");
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
function sendMessage() {
var messageInput = document.getElementById('messageText');
ws.send(messageInput.value);
messageInput.value = '';
}
</script>
</body>
</html>
"""

@app.get("/")
async def get():
return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive text message from the client
data = await websocket.receive_text()
# Echo the message back to the client
await websocket.send_text(f"Echo: {data}")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()

How It Works

  1. We create a route at /ws for the WebSocket connection
  2. We use websocket.accept() to establish the connection
  3. In a loop, we use websocket.receive_text() to wait for and receive text messages
  4. We then echo the message back to the client using websocket.send_text()
  5. We handle exceptions and ensure the connection is closed properly in the finally block

Working with JSON Messages

JSON is a common format for structured data exchange. FastAPI makes it easy to work with JSON over WebSockets:

python
from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws/json")
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive text message
data_str = await websocket.receive_text()

# Parse JSON
try:
data = json.loads(data_str)
# Add a response field
response_data = {
"original_message": data,
"response": f"Server received your JSON with {len(data)} fields"
}
# Send JSON response
await websocket.send_json(response_data)
except json.JSONDecodeError:
await websocket.send_text("Error: Invalid JSON format")
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()

How It Works

  1. We receive text data using websocket.receive_text()
  2. We parse the received text as JSON using json.loads()
  3. We create a response JSON object
  4. We send the response using websocket.send_json(), which handles JSON serialization for us
  5. We handle JSON parsing errors gracefully

Handling Binary Messages

For applications that need to transfer binary data (like images or files), FastAPI supports binary WebSocket messages:

python
from fastapi import FastAPI, WebSocket
import base64

app = FastAPI()

@app.websocket("/ws/binary")
async def websocket_binary_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive binary data
data = await websocket.receive_bytes()

# Process the binary data (example: get size and convert to base64)
size = len(data)
base64_data = base64.b64encode(data).decode("utf-8")

# Send info about the binary data as text
await websocket.send_text(
f"Received binary data of size {size} bytes. "
f"First 20 bytes in base64: {base64_data[:20]}..."
)

# Echo the binary data back
await websocket.send_bytes(data)
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()

How It Works

  1. We use websocket.receive_bytes() to receive binary data
  2. We process the binary data (in this example, we get its size and encode part of it as base64)
  3. We send back information about the binary data as text
  4. We also echo back the original binary data using websocket.send_bytes()

Mixed Message Types

In real applications, you might need to handle different types of messages through the same WebSocket connection. This can be done using the generic receive() method:

python
from fastapi import FastAPI, WebSocket
import json

app = FastAPI()

@app.websocket("/ws/mixed")
async def websocket_mixed_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive message generically
message = await websocket.receive()

# Check the message type
if "text" in message:
# Handle text message
text_data = message["text"]
await websocket.send_text(f"You sent a text message: {text_data}")

elif "bytes" in message:
# Handle binary message
binary_data = message["bytes"]
size = len(binary_data)
await websocket.send_text(f"You sent a binary message of size {size} bytes")

else:
# Unknown message type
await websocket.send_text("Received unknown message type")

except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()

How It Works

  1. We use the generic websocket.receive() method that returns a dictionary
  2. We check if the dictionary contains a "text" or "bytes" key to determine the type of message
  3. We handle each type of message accordingly

Real-World Example: Real-time Chat Application

Now let's build a more practical example: a simple chat room application where users can join and send messages to everyone else in the room.

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List, Dict
import json

app = FastAPI()

# HTML client for the chat application
html = """
<!DOCTYPE html>
<html>
<head>
<title>FastAPI Chat</title>
<style>
body {
max-width: 600px;
margin: 0 auto;
padding: 20px;
font-family: Arial, sans-serif;
}
#messages {
list-style-type: none;
padding: 0;
border: 1px solid #ddd;
height: 300px;
overflow-y: scroll;
padding: 10px;
margin-bottom: 10px;
}
#messageForm {
display: flex;
}
#messageText {
flex-grow: 1;
padding: 10px;
margin-right: 10px;
}
.system { color: #888; }
.user { color: blue; }
.me { color: green; }
</style>
</head>
<body>
<h1>FastAPI Chat Room</h1>
<div>
<label for="username">Your name:</label>
<input type="text" id="username" placeholder="Enter your name">
<button onclick="connect()">Join Chat</button>
</div>
<ul id="messages"></ul>
<form id="messageForm" style="display: none;" onsubmit="sendMessage(event)">
<input type="text" id="messageText" placeholder="Type a message">
<button type="submit">Send</button>
</form>
<script>
let ws = null;

function connect() {
const username = document.getElementById('username').value;
if (!username) {
alert("Please enter a name first!");
return;
}

if (ws) {
ws.close();
}

// Connect to WebSocket with username as a query parameter
ws = new WebSocket(`ws://localhost:8000/ws/chat?username=${username}`);

// Show the message form
document.getElementById('messageForm').style.display = "flex";

// Handle incoming messages
ws.onmessage = function(event) {
const messages = document.getElementById('messages');
const message = document.createElement('li');

// Parse the JSON message
const data = JSON.parse(event.data);

// Apply different styles based on message type
if (data.type === 'system') {
message.className = 'system';
} else if (data.sender === username) {
message.className = 'me';
} else {
message.className = 'user';
}

message.textContent = data.message;
messages.appendChild(message);
messages.scrollTop = messages.scrollHeight;
};

ws.onclose = function(event) {
document.getElementById('messages').innerHTML += '<li class="system">Connection closed</li>';
document.getElementById('messageForm').style.display = "none";
};
}

function sendMessage(event) {
event.preventDefault();
const messageInput = document.getElementById('messageText');
const message = messageInput.value;

if (message && ws) {
ws.send(JSON.stringify({
message: message
}));
messageInput.value = '';
}
}
</script>
</body>
</html>
"""

# Connection manager to keep track of active connections
class ConnectionManager:
def __init__(self):
# Active connections
self.active_connections: Dict[str, WebSocket] = {}

async def connect(self, websocket: WebSocket, username: str):
await websocket.accept()
self.active_connections[username] = websocket

def disconnect(self, username: str):
if username in self.active_connections:
del self.active_connections[username]

async def send_personal_message(self, message: str, type: str, username: str):
if username in self.active_connections:
payload = {
"message": message,
"sender": "Server",
"type": type
}
await self.active_connections[username].send_json(payload)

async def broadcast(self, message: str, sender: str, type: str = "message"):
payload = {
"message": message,
"sender": sender,
"type": type
}
for username, connection in list(self.active_connections.items()):
await connection.send_json(payload)

manager = ConnectionManager()

@app.get("/")
async def get():
return HTMLResponse(html)

@app.websocket("/ws/chat")
async def websocket_endpoint(websocket: WebSocket, username: str = "Anonymous"):
await manager.connect(websocket, username)
await manager.broadcast(f"{username} has joined the chat", "Server", "system")

try:
while True:
# Receive JSON data
data_str = await websocket.receive_text()
data = json.loads(data_str)

# Broadcast the message to all connected clients
await manager.broadcast(f"{username}: {data['message']}", username)

except WebSocketDisconnect:
# Handle client disconnection
manager.disconnect(username)
await manager.broadcast(f"{username} has left the chat", "Server", "system")
except Exception as e:
print(f"Error: {e}")
manager.disconnect(username)

How the Chat Application Works

  1. We create a ConnectionManager class to track active WebSocket connections
  2. Each client connects with a username as a query parameter
  3. Messages are sent as JSON objects and broadcast to all connected clients
  4. We handle disconnections gracefully
  5. The frontend displays messages differently based on their type (system message or user message)

Advanced Message Handling: Compression and Large Files

For advanced use cases like sending large files or compressed data, you can combine WebSockets with binary data processing:

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import zlib
import io

app = FastAPI()

@app.websocket("/ws/compressed")
async def websocket_compressed(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive compressed binary data
compressed_data = await websocket.receive_bytes()

# Decompress the data
try:
decompressed_data = zlib.decompress(compressed_data)
decompressed_size = len(decompressed_data)

# Send acknowledgment
await websocket.send_json({
"status": "success",
"message": f"Received and decompressed {len(compressed_data)} bytes into {decompressed_size} bytes",
"compression_ratio": f"{len(compressed_data) / decompressed_size:.2f}"
})

except zlib.error:
await websocket.send_json({
"status": "error",
"message": "Failed to decompress data"
})

except WebSocketDisconnect:
print("Client disconnected")
except Exception as e:
print(f"Error: {e}")

Summary

In this guide, we've covered how to send and receive different types of messages using FastAPI WebSockets:

  • Text messages with receive_text() and send_text()
  • JSON messages with receive_text() + JSON parsing and send_json()
  • Binary data with receive_bytes() and send_bytes()
  • Mixed message types with the generic receive() method
  • Real-world application: Building a chat room
  • Advanced handling: Compression and large files

By understanding these different message types and how to handle them, you can build sophisticated real-time applications using FastAPI WebSockets.

Further Learning and Exercises

To practice and enhance your understanding of FastAPI WebSocket messages, try these exercises:

  1. File Transfer Application: Create a WebSocket endpoint that allows users to upload and download files.

  2. Real-time Dashboard: Build a dashboard that streams random data every second and displays it as a chart on the frontend.

  3. Multi-room Chat: Extend the chat example to support multiple chat rooms.

  4. Message Encryption: Implement end-to-end encryption for WebSocket messages.

  5. Protocol Implementation: Implement a simple custom protocol over WebSockets (e.g., a protocol that includes message type, sequence number, and payload).

Additional Resources



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)