Skip to main content

FastAPI WebSocket Error Handling

When building real-time applications with FastAPI WebSockets, proper error handling is essential for creating robust and reliable applications. In this guide, we'll explore how to handle various WebSocket errors, implement exception handling patterns, and establish best practices for maintaining stable WebSocket connections.

Introduction to WebSocket Error Handling

WebSocket connections are long-lived compared to regular HTTP connections, which makes proper error handling even more important. Without it, your application might:

  • Leave connections hanging in an invalid state
  • Fail to notify clients of server-side issues
  • Leak resources due to improperly closed connections
  • Provide poor user experience when errors occur

Let's learn how to handle WebSocket errors properly in FastAPI applications.

Common WebSocket Error Scenarios

Before diving into implementation, let's understand the common error scenarios in WebSocket applications:

  1. Connection establishment failures: Issues that prevent a WebSocket connection from being established
  2. Runtime exceptions: Errors that occur while processing WebSocket messages
  3. Client disconnections: Handling unexpected client disconnections
  4. Timeout errors: Managing connections that become unresponsive
  5. Validation errors: Issues with message format or content validation

Basic Error Handling Structure in FastAPI WebSockets

Let's start with a basic pattern for handling errors in FastAPI WebSockets:

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import logging

app = FastAPI()
logger = logging.getLogger("websocket_app")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# Process the received data
await websocket.send_text(f"Message processed: {data}")
except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Error: {str(e)}")
# Try to send an error message if connection is still open
try:
await websocket.send_text(f"Error occurred: {str(e)}")
await websocket.close(code=1011) # Internal server error
except:
# Connection might be closed already
pass

This basic structure catches two types of exceptions:

  • WebSocketDisconnect: When the client disconnects
  • Exception: Any other exceptions that might occur

Handling Connection Establishment Errors

Errors can occur during the WebSocket connection establishment phase. Here's how to handle them:

python
@app.websocket("/ws/secure")
async def secure_websocket_endpoint(websocket: WebSocket):
try:
# Validate connection (example: check authentication)
user = await authenticate_websocket(websocket)
if not user:
# Reject the connection with an error code
await websocket.close(code=1008, reason="Authentication failed")
return

# Accept the connection only if authentication succeeds
await websocket.accept()

# Rest of the WebSocket logic...
while True:
data = await websocket.receive_text()
await websocket.send_text(f"User {user['name']}: {data}")

except Exception as e:
logger.error(f"Error in secure WebSocket: {str(e)}")
# If the connection was accepted but later failed
if websocket.client_state.CONNECTED:
await websocket.close(code=1011)

The code above shows how to:

  1. Perform authentication before accepting a WebSocket connection
  2. Reject connections with appropriate status codes and reasons
  3. Only proceed with accepting the connection after validation passes

Custom Exception Handler for WebSockets

For more organized error handling, we can create custom exception types and handlers:

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Optional, Dict, Any

class WebSocketException(Exception):
def __init__(self, code: int = 1000, reason: str = ""):
self.code = code
self.reason = reason
super().__init__(f"WebSocket error: {reason} (code: {code})")

class MessageValidationError(WebSocketException):
def __init__(self, reason: str = "Invalid message format"):
super().__init__(code=1007, reason=reason)

app = FastAPI()

async def handle_websocket_exception(
websocket: WebSocket,
exception: WebSocketException
) -> None:
"""Handle WebSocket exceptions gracefully."""
if websocket.client_state.CONNECTED:
try:
# Try to send error information to the client
await websocket.send_json({
"error": exception.reason,
"code": exception.code
})
await websocket.close(code=exception.code, reason=exception.reason)
except:
# If sending fails, just close the connection
try:
await websocket.close(code=exception.code)
except:
pass

Now we can use these custom exceptions in our WebSocket endpoints:

python
@app.websocket("/ws/messages")
async def message_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()

# Validate message structure
if "message" not in data:
raise MessageValidationError("Missing 'message' field")

if len(data["message"]) > 200:
raise MessageValidationError("Message too long (max 200 chars)")

# Process valid message
await websocket.send_json({
"status": "success",
"message": f"Received: {data['message']}"
})

except WebSocketDisconnect:
logger.info("Client disconnected normally")
except MessageValidationError as e:
await handle_websocket_exception(websocket, e)
except Exception as e:
# Handle unexpected errors
logger.error(f"Unexpected error: {str(e)}")
await handle_websocket_exception(
websocket,
WebSocketException(code=1011, reason="Server error")
)

This approach provides:

  1. Custom exception types for different error scenarios
  2. Consistent error handling with proper WebSocket close codes
  3. Informative error messages sent to the client

Using WebSocket Close Codes Effectively

WebSocket protocol defines specific close codes that should be used for different scenarios:

python
# Example of using proper WebSocket close codes
async def close_with_appropriate_code(websocket: WebSocket, situation: str):
if situation == "normal":
# Normal closure
await websocket.close(code=1000)
elif situation == "going_away":
# Server is shutting down or client navigating away
await websocket.close(code=1001, reason="Server shutting down")
elif situation == "protocol_error":
# Protocol error
await websocket.close(code=1002, reason="Protocol error")
elif situation == "invalid_data":
# Received data cannot be accepted
await websocket.close(code=1003, reason="Invalid data")
elif situation == "policy_violation":
# Policy violation (e.g., message too large)
await websocket.close(code=1008, reason="Message size exceeds limit")
elif situation == "server_error":
# Server internal error
await websocket.close(code=1011, reason="Internal server error")

Using the correct close codes helps clients understand why the connection was terminated and how to respond appropriately.

Timeout Handling for WebSockets

Long-running WebSocket connections might become stale. Here's how to implement timeout handling:

python
import asyncio

@app.websocket("/ws/with-timeout")
async def websocket_with_timeout(websocket: WebSocket):
await websocket.accept()

# Set a 30-second timeout for receiving messages
try:
last_activity = asyncio.create_task(websocket.receive_text())
while True:
# Wait for either a message or timeout
done, pending = await asyncio.wait(
[last_activity],
timeout=30
)

# If no message received within timeout
if not done:
# Cancel the pending receive task
for task in pending:
task.cancel()

await websocket.send_text("Connection timed out due to inactivity")
await websocket.close(code=1000, reason="Inactivity timeout")
break

# Process the message
message = done.pop().result()
await websocket.send_text(f"Echo: {message}")

# Start waiting for the next message
last_activity = asyncio.create_task(websocket.receive_text())

except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Error: {str(e)}")
await websocket.close(code=1011)

This implementation:

  1. Sets a 30-second inactivity timeout
  2. Uses asyncio.wait() with a timeout to detect inactivity
  3. Sends a notification to the client before closing
  4. Properly cleans up pending tasks

Implementing Heartbeat Mechanism

A common pattern for keeping WebSocket connections alive and detecting disconnections is implementing a heartbeat mechanism:

python
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws/heartbeat")
async def websocket_heartbeat(websocket: WebSocket):
await websocket.accept()

# Heartbeat interval in seconds
heartbeat_interval = 5

async def send_heartbeat():
while True:
try:
await websocket.send_json({"type": "heartbeat"})
await asyncio.sleep(heartbeat_interval)
except:
# If sending heartbeat fails, exit the task
break

heartbeat_task = asyncio.create_task(send_heartbeat())

try:
while True:
message = await websocket.receive_json()

# Check if it's a heartbeat acknowledgment
if message.get("type") == "heartbeat_ack":
continue

# Process regular messages
await websocket.send_json({
"type": "message",
"content": f"Received: {message.get('content', '')}"
})

except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Error: {str(e)}")
finally:
# Clean up the heartbeat task
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass

The heartbeat mechanism:

  1. Sends a periodic heartbeat message to the client
  2. Clients are expected to respond with heartbeat acknowledgments
  3. Ensures the connection is still alive and detects network issues quickly

Real-World Application: Chat Room with Error Handling

Let's build a complete example of a chat room with comprehensive error handling:

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import logging
import json

app = FastAPI()
logger = logging.getLogger("chat_app")

# Store active connections
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}

async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[user_id] = websocket

def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]

async def broadcast(self, message: str, exclude_user: str = None):
disconnected_users = []

for user_id, connection in self.active_connections.items():
if user_id != exclude_user:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to {user_id}: {str(e)}")
disconnected_users.append(user_id)

# Clean up disconnected users
for user_id in disconnected_users:
self.disconnect(user_id)

manager = ConnectionManager()

@app.websocket("/ws/chat/{user_id}")
async def chat_websocket(websocket: WebSocket, user_id: str):
try:
# Basic validation
if not user_id or len(user_id) < 3:
await websocket.close(code=1008, reason="Invalid user ID")
return

# Check for duplicate user
if user_id in manager.active_connections:
await websocket.close(code=1008, reason="User ID already in use")
return

# Accept connection and add to manager
await manager.connect(user_id, websocket)

# Notify everyone about new user
await manager.broadcast(
json.dumps({
"type": "system",
"message": f"User {user_id} has joined the chat"
})
)

try:
# Process messages
while True:
data = await websocket.receive_text()

try:
# Try to parse as JSON
message_data = json.loads(data)

# Validate message structure
if "message" not in message_data:
await websocket.send_json({
"type": "error",
"message": "Invalid message format: missing 'message' field"
})
continue

# Check message length
if len(message_data["message"]) > 500:
await websocket.send_json({
"type": "error",
"message": "Message too long (max 500 chars)"
})
continue

# Broadcast valid message
await manager.broadcast(
json.dumps({
"type": "message",
"user": user_id,
"message": message_data["message"]
}),
exclude_user=None # Send to everyone including sender
)

except json.JSONDecodeError:
await websocket.send_json({
"type": "error",
"message": "Invalid JSON format"
})

except WebSocketDisconnect:
# Client disconnected, clean up
manager.disconnect(user_id)
await manager.broadcast(
json.dumps({
"type": "system",
"message": f"User {user_id} has left the chat"
})
)

except Exception as e:
logger.exception(f"Unexpected error in chat WebSocket: {str(e)}")
try:
await websocket.close(code=1011)
except:
pass

# Make sure to clean up
if user_id in manager.active_connections:
manager.disconnect(user_id)

This comprehensive chat application demonstrates:

  1. Connection validation before accepting WebSockets
  2. Handling duplicate user IDs
  3. JSON message parsing and validation
  4. Error responses to invalid messages
  5. Proper disconnection and cleanup
  6. System notifications for user joins and leaves
  7. Exception handling at multiple levels

Best Practices for WebSocket Error Handling

Based on the examples we've covered, here are some best practices for WebSocket error handling in FastAPI:

  1. Validate before accepting: Perform validation checks before accepting the WebSocket connection
  2. Use appropriate close codes: Send the correct WebSocket close code for different error scenarios
  3. Implement timeouts: Use timeouts to prevent connections from hanging indefinitely
  4. Add heartbeat mechanisms: Implement heartbeats to detect stale connections
  5. Provide meaningful error messages: Send clear error messages that help clients understand what went wrong
  6. Clean up resources: Make sure to clean up all resources when connections end
  7. Log errors: Log errors on the server for debugging and monitoring
  8. Handle disconnects gracefully: Have plans for handling unexpected disconnections
  9. Use custom exceptions: Create custom exception types for different error scenarios
  10. Implement reconnection strategies: On the client side, implement reconnection logic with backoff

Summary

Proper error handling is a crucial aspect of building robust WebSocket applications with FastAPI. By implementing the strategies covered in this guide, you can create more reliable real-time applications that gracefully handle failures, maintain connection integrity, and provide a better experience for your users.

We've explored:

  • Basic error handling structure
  • Connection establishment error handling
  • Custom exception types for WebSockets
  • WebSocket close codes and their meanings
  • Timeout and heartbeat mechanisms
  • A comprehensive chat application with error handling

By applying these techniques, you can build WebSocket applications that are resilient, maintainable, and provide proper feedback to users when things go wrong.

Additional Resources

Exercises

  1. Basic Error Handler: Implement a WebSocket endpoint that validates received messages and returns appropriate error messages for invalid inputs.

  2. Timeout Handler: Create a WebSocket endpoint with a configurable timeout that closes inactive connections.

  3. Chat Application: Extend the chat application example to include private messaging with error handling for non-existent users.

  4. Reconnection Strategy: Implement a client-side reconnection strategy with exponential backoff for a WebSocket connection.

  5. Rate Limiting: Add rate limiting to a WebSocket endpoint to prevent spamming and implement proper error responses when limits are exceeded.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)