Skip to main content

FastAPI WebSocket Broadcasting

Introduction

WebSockets provide a powerful way to implement real-time bidirectional communication between clients and servers. While basic WebSocket connections work well for one-to-one communication, many applications require the ability to send messages to multiple connected clients simultaneously. This capability is called broadcasting.

In this tutorial, we'll explore how to implement WebSocket broadcasting in FastAPI, allowing you to build applications like chat rooms, live notifications, or multiplayer games where updates need to be shared with multiple users in real-time.

What is WebSocket Broadcasting?

Broadcasting refers to the server's ability to send a message to multiple connected WebSocket clients at once. Common use cases include:

  • Chat applications where messages are shared with all participants
  • Live dashboards where updates are pushed to all viewers
  • Collaborative editing tools where changes by one user are visible to others
  • Multiplayer games where player actions affect the shared game state

Prerequisites

Before diving into broadcasting, make sure you:

  1. Have FastAPI installed: pip install fastapi uvicorn[standard]
  2. Understand basic WebSocket concepts in FastAPI
  3. Have a basic understanding of async programming in Python

Implementing a Connection Manager

The key to implementing broadcasting is to create a connection manager that keeps track of all active WebSocket connections. Let's build one step by step:

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict, Any

app = FastAPI()

class ConnectionManager:
def __init__(self):
# Store all active connections
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
# Accept the connection
await websocket.accept()
# Store the connection
self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
# Remove the connection
self.active_connections.remove(websocket)

async def broadcast(self, message: Dict[str, Any]):
# Send message to all connected clients
for connection in self.active_connections:
await connection.send_json(message)

# Create a connection manager instance
manager = ConnectionManager()

The ConnectionManager class handles three main tasks:

  • Accepting and tracking new connections
  • Removing connections when clients disconnect
  • Broadcasting messages to all connected clients

Basic Chat Room Example

Let's implement a simple chat room where messages from any client are broadcasted to all connected clients:

python
@app.websocket("/ws/chat")
async def chat_endpoint(websocket: WebSocket):
# Accept and store the connection
await manager.connect(websocket)
try:
while True:
# Receive message from the client
data = await websocket.receive_text()

# Create a message with the data
message = {
"type": "message",
"content": data,
"sender_id": id(websocket) # Use object ID as a simple identifier
}

# Broadcast the message to all connected clients
await manager.broadcast(message)
except WebSocketDisconnect:
# Handle disconnection
manager.disconnect(websocket)
# Notify other users about the disconnection
await manager.broadcast({
"type": "system",
"content": f"Client #{id(websocket)} has left the chat"
})

Advanced Broadcasting: Room-Based Communication

In many applications, you might want to broadcast messages only to specific groups of users (rooms). Let's extend our connection manager to support this:

python
class AdvancedConnectionManager:
def __init__(self):
# Map room_id -> list of connections in that room
self.rooms: Dict[str, List[WebSocket]] = {}

async def connect(self, websocket: WebSocket, room_id: str):
await websocket.accept()

# Create room if it doesn't exist
if room_id not in self.rooms:
self.rooms[room_id] = []

# Add connection to the room
self.rooms[room_id].append(websocket)

def disconnect(self, websocket: WebSocket, room_id: str):
# Remove connection from the room
self.rooms[room_id].remove(websocket)

# Remove room if empty
if not self.rooms[room_id]:
del self.rooms[room_id]

async def broadcast(self, message: Dict[str, Any], room_id: str):
# Broadcast only to connections in the specified room
if room_id in self.rooms:
for connection in self.rooms[room_id]:
await connection.send_json(message)

# Create advanced manager
advanced_manager = AdvancedConnectionManager()

Now we can implement room-based chat:

python
@app.websocket("/ws/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
await advanced_manager.connect(websocket, room_id)
try:
# Send a welcome message to the new user
await websocket.send_json({
"type": "system",
"content": f"Welcome to room {room_id}!"
})

# Announce to other users in the room
await advanced_manager.broadcast({
"type": "system",
"content": f"User #{id(websocket)} has joined the room"
}, room_id)

while True:
# Receive and broadcast messages
data = await websocket.receive_text()
await advanced_manager.broadcast({
"type": "message",
"sender_id": id(websocket),
"content": data
}, room_id)

except WebSocketDisconnect:
advanced_manager.disconnect(websocket, room_id)
await advanced_manager.broadcast({
"type": "system",
"content": f"User #{id(websocket)} has left the room"
}, room_id)

Adding User Authentication

In real applications, you'll want to authenticate users before allowing them to join a broadcast. Here's a simple way to add authentication:

python
from fastapi import Depends, HTTPException, status, Cookie
from typing import Optional

# Mock user authentication function
async def get_user_from_token(token: Optional[str] = Cookie(None)):
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated"
)

# In a real app, validate the token and fetch the user
# For this example, we'll just return a mock user
return {"user_id": token, "username": f"User-{token}"}

@app.websocket("/ws/authenticated-chat")
async def authenticated_chat(
websocket: WebSocket,
user: dict = Depends(get_user_from_token)
):
await manager.connect(websocket)
try:
# Welcome message
await websocket.send_json({
"type": "system",
"content": f"Welcome {user['username']}!"
})

# Announce to other users
await manager.broadcast({
"type": "system",
"content": f"{user['username']} has joined the chat"
})

while True:
data = await websocket.receive_text()
# Include user info in the broadcast
await manager.broadcast({
"type": "message",
"content": data,
"username": user["username"],
"user_id": user["user_id"]
})
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast({
"type": "system",
"content": f"{user['username']} has left the chat"
})

Real-World Example: Live Dashboard

Let's implement a more practical example: a live dashboard that broadcasts updates to all viewers.

python
import asyncio
import random
from datetime import datetime

@app.on_event("startup")
async def startup_event():
# Start the data generator task
asyncio.create_task(generate_dashboard_data())

async def generate_dashboard_data():
while True:
# Generate some random metrics
data = {
"timestamp": str(datetime.now()),
"metrics": {
"cpu_usage": random.randint(10, 95),
"memory_usage": random.randint(20, 80),
"active_users": random.randint(5, 1000),
"requests_per_minute": random.randint(10, 5000)
}
}

# Broadcast to all dashboard viewers
await advanced_manager.broadcast(data, "dashboard")

# Wait for a bit before sending the next update
await asyncio.sleep(2)

@app.websocket("/ws/dashboard")
async def dashboard_feed(websocket: WebSocket):
await advanced_manager.connect(websocket, "dashboard")
try:
# Keep the connection alive
while True:
# We still need to receive messages to detect disconnects
# but we don't need to do anything with them
_ = await websocket.receive_text()
except WebSocketDisconnect:
advanced_manager.disconnect(websocket, "dashboard")

In this example, a background task continuously generates random dashboard data and broadcasts it to all clients connected to the dashboard room. The clients just need to connect and listen for updates.

Frontend Implementation

Here's a simple JavaScript example to connect to our broadcasting WebSocket endpoint:

javascript
// Chat room client example
const roomId = "general";
const socket = new WebSocket(`ws://localhost:8000/ws/chat/${roomId}`);

socket.onopen = () => {
console.log("Connected to chat room:", roomId);
};

socket.onmessage = (event) => {
const data = JSON.parse(event.data);

if (data.type === "system") {
console.log("System message:", data.content);
} else if (data.type === "message") {
console.log(`User #${data.sender_id}: ${data.content}`);
}
};

// Function to send a message to the room
function sendMessage(message) {
if (socket.readyState === WebSocket.OPEN) {
socket.send(message);
}
}

// Dashboard client example
const dashboardSocket = new WebSocket("ws://localhost:8000/ws/dashboard");
dashboardSocket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Dashboard update:", data);
updateDashboardUI(data); // Function to update the UI with new data
};

Handling Connection Errors and Reconnection

In real-world applications, you should handle connection errors and implement reconnection logic:

javascript
// Recommended client-side reconnection logic
function connectWebSocket(url, options = {}) {
const {
maxRetries = 5,
retryDelay = 1000,
onMessage,
onOpen,
onClose
} = options;

let retries = 0;
let socket;

function connect() {
socket = new WebSocket(url);

socket.onopen = () => {
console.log('WebSocket connected');
retries = 0;
if (onOpen) onOpen(socket);
};

socket.onmessage = (event) => {
if (onMessage) onMessage(JSON.parse(event.data), socket);
};

socket.onclose = (event) => {
if (onClose) onClose(event);

if (!event.wasClean && retries < maxRetries) {
retries++;
const delay = retryDelay * Math.pow(2, retries - 1);
console.log(`Connection closed. Retrying in ${delay}ms...`);
setTimeout(connect, delay);
}
};

socket.onerror = (error) => {
console.error('WebSocket error:', error);
};
}

connect();
return {
getSocket: () => socket,
send: (data) => {
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(typeof data === 'string' ? data : JSON.stringify(data));
}
}
};
}

// Usage
const chat = connectWebSocket(`ws://localhost:8000/ws/chat/general`, {
onMessage: (data) => {
console.log('Received:', data);
},
onOpen: () => {
chat.send('Hello everyone!');
}
});

Performance Considerations

When implementing WebSocket broadcasting, consider these performance tips:

  1. Limit connection state: Store only what's necessary in memory per connection
  2. Use connection pools: For large-scale applications, consider limiting the maximum number of connections
  3. Consider message queues: For very large deployments, use message brokers like Redis, RabbitMQ, or Kafka
  4. Implement rate limiting: Prevent abuse by limiting how frequently clients can broadcast messages

For scaling to thousands or millions of users, consider using specialized WebSocket services or implementing a distributed system with multiple WebSocket servers coordinated through a message queue.

Summary

WebSocket broadcasting in FastAPI enables powerful real-time applications by allowing the server to send messages to multiple connected clients simultaneously. In this tutorial, we've covered:

  • Creating a basic connection manager to track WebSocket connections
  • Broadcasting messages to all connected clients
  • Implementing room-based broadcasting for group communications
  • Adding authentication to secure WebSocket connections
  • Building a practical live dashboard example
  • Client-side connection and reconnection logic

With these techniques, you can build a wide range of real-time applications, from chat services and collaborative editors to live dashboards and multiplayer games.

Additional Resources

Exercises

  1. Extend the chat room example to store the last 10 messages and send them to new users when they join
  2. Implement private messaging functionality where a user can send a message to a specific user
  3. Add typing indicators that show when someone is typing a message
  4. Create a simple collaborative drawing application where all users can see others' drawings in real-time
  5. Implement a notification system that sends updates to specific users based on their subscriptions


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)