Skip to main content

Echo WebSocket Scaling

Introduction

WebSockets enable real-time, bidirectional communication between clients and servers, making them ideal for applications like chat systems, live dashboards, and collaborative tools. However, as your application grows and attracts more users, you'll need to consider how to scale your WebSocket infrastructure to maintain performance and reliability.

In this guide, we'll explore strategies for scaling Echo WebSocket applications to handle thousands or even millions of concurrent connections efficiently. We'll discuss architectural patterns, load balancing techniques, and best practices to ensure your real-time applications remain responsive under heavy loads.

Understanding WebSocket Scaling Challenges

Before diving into solutions, let's understand the unique challenges that WebSockets present for scaling:

  1. Persistent Connections: Unlike HTTP requests, WebSocket connections remain open, consuming server resources for extended periods.
  2. State Management: WebSockets often maintain session state, making load balancing more complex.
  3. Message Broadcasting: Real-time applications frequently need to send messages to many clients simultaneously.
  4. Connection Handling: The server must efficiently manage numerous concurrent connections.

Vertical vs. Horizontal Scaling

Vertical Scaling

Vertical scaling involves increasing the resources (CPU, RAM) of your existing server to handle more connections.

go
// Example of configuring Echo with optimized server settings
e := echo.New()
s := &http.Server{
Addr: ":8000",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
}
e.Logger.Fatal(e.StartServer(s))

Pros:

  • Simpler to implement - no distribution complexity
  • No need for distributing state across servers

Cons:

  • Has physical hardware limitations
  • Single point of failure
  • Cost increases exponentially with scale

Horizontal Scaling

Horizontal scaling involves adding more server instances to distribute the load.

Horizontal WebSocket Scaling Diagram

This approach requires:

  1. Load balancing across multiple servers
  2. Shared state management
  3. Message routing between servers

Load Balancing WebSockets

When scaling horizontally, proper load balancing is crucial. Here are some considerations:

Sticky Sessions

WebSockets benefit from sticky sessions, where a client always connects to the same server:

nginx
# Example Nginx configuration for WebSocket sticky sessions
upstream websocket_servers {
ip_hash; # Routes clients to the same server based on IP
server ws1.example.com:8000;
server ws2.example.com:8000;
server ws3.example.com:8000;
}

server {
listen 80;
location /ws {
proxy_pass http://websocket_servers;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

Connection Distribution

For initial connection distribution, you can use:

  1. Round-robin: Distributes new connections sequentially
  2. Least connections: Routes to servers with fewer active connections
  3. IP hash: Consistently maps clients to specific servers

State Management Across Nodes

To maintain application state across multiple server instances, consider these approaches:

Shared Redis Pub/Sub

Redis provides an excellent mechanism for cross-server communication:

go
package main

import (
"github.com/go-redis/redis/v8"
"github.com/labstack/echo/v4"
"github.com/gorilla/websocket"
"context"
)

var (
redisClient *redis.Client
upgrader = websocket.Upgrader{}
)

func main() {
e := echo.New()

// Initialize Redis client
redisClient = redis.NewClient(&redis.Options{
Addr: "redis:6379",
})

// Subscribe to Redis channel for broadcasting
ctx := context.Background()
pubsub := redisClient.Subscribe(ctx, "broadcast")

// Listen for messages from Redis and broadcast to WebSocket clients
go func() {
ch := pubsub.Channel()
for msg := range ch {
broadcastToClients(msg.Payload)
}
}()

e.GET("/ws", handleWebSocket)
e.Logger.Fatal(e.Start(":8000"))
}

func handleWebSocket(c echo.Context) error {
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()

// WebSocket handling logic...

return nil
}

func broadcastMessage(message string) {
ctx := context.Background()
// Publish message to all servers
redisClient.Publish(ctx, "broadcast", message)
}

func broadcastToClients(message string) {
// Send to all connected WebSocket clients on this server
// Implementation depends on how you track client connections
}

Alternative Message Brokers

For larger scale applications, consider:

  1. NATS: Lightweight messaging system designed for high-performance
  2. RabbitMQ: Robust message broker with extensive features
  3. Apache Kafka: High-throughput distributed messaging system

Room-Based Broadcasting Optimization

Most real-time applications don't need to broadcast to all users. Implement a room-based system:

go
type Room struct {
ID string
Clients map[*websocket.Conn]bool
mutex sync.Mutex
}

var (
rooms = make(map[string]*Room)
roomsMutex sync.Mutex
)

func createOrJoinRoom(roomID string, client *websocket.Conn) *Room {
roomsMutex.Lock()
defer roomsMutex.Unlock()

room, exists := rooms[roomID]
if !exists {
room = &Room{
ID: roomID,
Clients: make(map[*websocket.Conn]bool),
}
rooms[roomID] = room
}

room.mutex.Lock()
room.Clients[client] = true
room.mutex.Unlock()

return room
}

func (r *Room) Broadcast(message []byte) {
r.mutex.Lock()
defer r.mutex.Unlock()

for client := range r.Clients {
err := client.WriteMessage(websocket.TextMessage, message)
if err != nil {
client.Close()
delete(r.Clients, client)
}
}
}

With Redis for cross-server room management:

go
func broadcastToRoom(roomID string, message []byte) {
ctx := context.Background()
payload := fmt.Sprintf("%s:%s", roomID, message)
redisClient.Publish(ctx, "room_messages", payload)
}

Connection Pooling and Management

Efficiently managing WebSocket connections is critical:

go
type ConnectionManager struct {
connections map[string]*websocket.Conn
mutex sync.RWMutex
}

func NewConnectionManager() *ConnectionManager {
return &ConnectionManager{
connections: make(map[string]*websocket.Conn),
}
}

func (cm *ConnectionManager) Add(id string, conn *websocket.Conn) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
cm.connections[id] = conn
}

func (cm *ConnectionManager) Remove(id string) {
cm.mutex.Lock()
defer cm.mutex.Unlock()
delete(cm.connections, id)
}

func (cm *ConnectionManager) Get(id string) (*websocket.Conn, bool) {
cm.mutex.RLock()
defer cm.mutex.RUnlock()
conn, exists := cm.connections[id]
return conn, exists
}

Health Checks and Circuit Breaking

Implement health checks to detect and recover from issues:

go
// Health check endpoint
e.GET("/health", func(c echo.Context) error {
// Check critical dependencies
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := redisClient.Ping(ctx).Err()
if err != nil {
return c.JSON(http.StatusServiceUnavailable, map[string]string{
"status": "unhealthy",
"reason": "redis connection failed",
})
}

return c.JSON(http.StatusOK, map[string]string{
"status": "healthy",
})
})

Real-World Example: Scalable Chat Application

Let's build a scalable WebSocket chat application using Echo and Redis:

go
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"

"github.com/go-redis/redis/v8"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)

// Message represents a chat message
type Message struct {
Room string `json:"room"`
Content string `json:"content"`
Sender string `json:"sender"`
}

var (
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // Allow all connections for demo
},
}

// Redis client for pub/sub
redisClient *redis.Client

// Server instance ID for logging
serverID = "server-" + generateRandomString(5)

// Room manager
rooms = make(map[string]*Room)
roomsMutex sync.Mutex
)

type Room struct {
ID string
Clients map[*websocket.Conn]string // Maps connection to user ID
mutex sync.Mutex
}

func main() {
// Initialize Echo
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())

// Initialize Redis
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})

// Subscribe to Redis channels
ctx := context.Background()
pubsub := redisClient.Subscribe(ctx, "chat_messages")

// Handle messages from other servers
go func() {
ch := pubsub.Channel()
for msg := range ch {
var message Message
if err := json.Unmarshal([]byte(msg.Payload), &message); err == nil {
broadcastToRoom(message.Room, []byte(msg.Payload), "")
}
}
}()

// Serve static files
e.Static("/", "public")

// WebSocket endpoint
e.GET("/ws/:room/:user", handleWebSocket)

// Start server
e.Logger.Info(fmt.Sprintf("Starting server %s on port 8000", serverID))
e.Logger.Fatal(e.Start(":8000"))
}

func handleWebSocket(c echo.Context) error {
roomID := c.Param("room")
userID := c.Param("user")

// Upgrade connection to WebSocket
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}

// Join or create room
room := joinRoom(roomID, ws, userID)

// Notify room about new user
joinMsg := Message{
Room: roomID,
Sender: "system",
Content: userID + " has joined the room",
}
msgBytes, _ := json.Marshal(joinMsg)
publishMessage(msgBytes)

// Handle incoming messages
for {
_, p, err := ws.ReadMessage()
if err != nil {
leaveRoom(roomID, ws)
leaveMsg := Message{
Room: roomID,
Sender: "system",
Content: userID + " has left the room",
}
leaveBytes, _ := json.Marshal(leaveMsg)
publishMessage(leaveBytes)
return nil
}

// Publish the message to Redis
publishMessage(p)
}
}

func publishMessage(message []byte) {
ctx := context.Background()
redisClient.Publish(ctx, "chat_messages", string(message))
}

func joinRoom(roomID string, conn *websocket.Conn, userID string) *Room {
roomsMutex.Lock()
defer roomsMutex.Unlock()

room, exists := rooms[roomID]
if !exists {
room = &Room{
ID: roomID,
Clients: make(map[*websocket.Conn]string),
}
rooms[roomID] = room
}

room.mutex.Lock()
room.Clients[conn] = userID
room.mutex.Unlock()

return room
}

func leaveRoom(roomID string, conn *websocket.Conn) {
roomsMutex.Lock()
defer roomsMutex.Unlock()

room, exists := rooms[roomID]
if !exists {
return
}

room.mutex.Lock()
delete(room.Clients, conn)
room.mutex.Unlock()
}

func broadcastToRoom(roomID string, message []byte, skipConn string) {
roomsMutex.Lock()
room, exists := rooms[roomID]
roomsMutex.Unlock()

if !exists {
return
}

room.mutex.Lock()
defer room.mutex.Unlock()

for conn := range room.Clients {
if conn.RemoteAddr().String() != skipConn {
conn.WriteMessage(websocket.TextMessage, message)
}
}
}

func generateRandomString(length int) string {
// Implementation omitted for brevity
return "abc123"
}

Monitoring and Performance Metrics

Implement comprehensive monitoring to track your WebSocket application's performance:

go
// Middleware to track WebSocket connections
func trackWebSocketMetrics(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
startTime := time.Now()

// Execute the handler
err := next(c)

// Record metrics (could send to Prometheus or other monitoring system)
duration := time.Since(startTime)
logMetric("websocket_handler_duration_ms", duration.Milliseconds(), map[string]string{
"path": c.Path(),
"status": fmt.Sprintf("%d", c.Response().Status),
})

return err
}
}

func logMetric(name string, value int64, tags map[string]string) {
// Implementation would depend on your monitoring system
// For example, could send to Prometheus, StatsD, etc.
}

// Apply the middleware
e.GET("/ws", handleWebSocket, trackWebSocketMetrics)

Best Practices for WebSocket Scaling

  1. Implement Heartbeats: Regularly exchange ping/pong frames to detect and clean up dead connections.
go
// Example of implementing heartbeats
const (
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10
)

func handleWebSocket(c echo.Context) error {
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
}
defer ws.Close()

// Set pong handler
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error {
ws.SetReadDeadline(time.Now().Add(pongWait))
return nil
})

// Send pings periodically
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()

go func() {
for range ticker.C {
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
return
}
}
}()

// Handle messages...
}
  1. Graceful Shutdown: Implement proper shutdown procedures to close connections cleanly.
go
// Clean server shutdown
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)

go func() {
<-c
fmt.Println("Shutting down server...")

// Notify clients of shutdown
broadcastToAllClients([]byte(`{"type":"system","message":"Server is shutting down"}`))

// Close all WebSocket connections
closeAllConnections()

// Shut down the HTTP server
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
e.Shutdown(ctx)
}()
  1. Efficient Message Serialization: Use compact message formats to reduce bandwidth.

  2. Connection Limiting: Implement connection limits per server to prevent resource exhaustion.

  3. Backpressure Handling: Implement mechanisms to handle cases when message processing can't keep up with incoming traffic.

Summary

Scaling WebSocket applications with Echo involves addressing several key challenges:

  1. Architecture: Choose between vertical and horizontal scaling based on your needs.
  2. Load Balancing: Implement proper load balancing with sticky sessions for WebSockets.
  3. State Management: Use Redis or another message broker for cross-server communication.
  4. Connection Management: Efficiently track and manage WebSocket connections.
  5. Room-Based Broadcasting: Optimize message distribution by using room/channel concepts.
  6. Monitoring: Implement comprehensive metrics to track performance and troubleshoot issues.

By following these strategies and best practices, you can build Echo WebSocket applications that scale to handle thousands or even millions of concurrent connections while maintaining performance and stability.

Additional Resources

Exercises

  1. Implement a simple chat application that uses Redis for cross-server message broadcasting.
  2. Add room functionality to the chat application so users can join different chat rooms.
  3. Implement connection metrics that track the number of active WebSocket connections.
  4. Create a load testing script to test how many concurrent WebSocket connections your server can handle.
  5. Modify the chat application to handle server shutdown gracefully, notifying clients before closing connections.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)