Echo WebSocket Broadcast
WebSockets provide a powerful way to establish bidirectional communication between clients and servers. One of the most valuable features of WebSockets is the ability to broadcast messages to multiple connected clients simultaneously. In this tutorial, we'll explore how to implement WebSocket broadcasting using the Echo framework.
What is Broadcasting?
Broadcasting refers to the process of sending a single message from the server to multiple connected clients. This is particularly useful for:
- Chat applications where messages need to be delivered to all participants
- Real-time dashboards that need to update all viewers simultaneously
- Collaborative tools where changes made by one user need to be reflected for all users
- Live notifications that should appear for all active users
Prerequisites
Before we dive into broadcasting, make sure you have:
- Go installed on your system
- Basic knowledge of Go programming
- Familiarity with the Echo framework
- Understanding of WebSocket basics
Setting Up a WebSocket Hub
To implement broadcasting, we need a way to keep track of all connected clients. We'll create a Hub
structure that manages WebSocket connections:
type Client struct {
ID string
Conn *websocket.Conn
Send chan []byte
Hub *Hub
}
type Hub struct {
Clients map[*Client]bool
Register chan *Client
Unregister chan *Client
Broadcast chan []byte
mutex sync.Mutex
}
func NewHub() *Hub {
return &Hub{
Clients: make(map[*Client]bool),
Register: make(chan *Client),
Unregister: make(chan *Client),
Broadcast: make(chan []byte),
}
}
Starting the Hub
The Hub needs to run in a goroutine to handle registrations, unregistrations, and broadcasts:
func (h *Hub) Run() {
for {
select {
case client := <-h.Register:
h.mutex.Lock()
h.Clients[client] = true
h.mutex.Unlock()
case client := <-h.Unregister:
h.mutex.Lock()
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
close(client.Send)
}
h.mutex.Unlock()
case message := <-h.Broadcast:
h.mutex.Lock()
for client := range h.Clients {
select {
case client.Send <- message:
default:
close(client.Send)
delete(h.Clients, client)
}
}
h.mutex.Unlock()
}
}
}
Client Handler Functions
Each client needs to handle reading from and writing to its WebSocket connection:
func (c *Client) ReadPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(512) // Set maximum message size
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// Broadcast the received message
c.Hub.Broadcast <- message
}
}
func (c *Client) WritePump() {
ticker := time.NewTicker(54 * time.Second)
defer func() {
ticker.Stop()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if !ok {
// The hub closed the channel
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// Add queued messages to the current websocket message
n := len(c.Send)
for i := 0; i < n; i++ {
w.Write([]byte{'\n'})
w.Write(<-c.Send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Integrating with Echo
Now let's set up the Echo handler for WebSocket connections:
package main
import (
"log"
"net/http"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
var (
upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true // Allow all origins for this example
},
}
hub = NewHub()
)
func main() {
// Start the hub
go hub.Run()
// Create Echo instance
e := echo.New()
// Middleware
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routes
e.GET("/ws", handleWebSocket)
e.Static("/", "public")
// Start server
e.Logger.Fatal(e.Start(":8000"))
}
func handleWebSocket(c echo.Context) error {
conn, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
log.Println(err)
return err
}
client := &Client{
ID: c.Request().RemoteAddr,
Conn: conn,
Send: make(chan []byte, 256),
Hub: hub,
}
client.Hub.Register <- client
// Allow collection of memory referenced by the caller by doing all work in new goroutines
go client.WritePump()
go client.ReadPump()
return nil
}
Creating a Simple Chat Application
Let's create a simple chat application to demonstrate WebSocket broadcasting in action. First, we'll create the HTML/JavaScript client:
<!DOCTYPE html>
<html>
<head>
<title>Echo WebSocket Chat</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
max-width: 800px;
margin: 0 auto;
}
#chat {
height: 300px;
border: 1px solid #ccc;
margin-bottom: 15px;
padding: 10px;
overflow-y: scroll;
}
#message {
width: 80%;
padding: 5px;
}
button {
padding: 5px 10px;
}
</style>
</head>
<body>
<h1>Echo WebSocket Chat</h1>
<div id="chat"></div>
<div>
<input type="text" id="message" placeholder="Type your message here...">
<button onclick="sendMessage()">Send</button>
</div>
<div>
<p>Connection Status: <span id="status">Connecting...</span></p>
</div>
<script>
let chatDiv = document.getElementById('chat');
let msgInput = document.getElementById('message');
let statusSpan = document.getElementById('status');
let ws;
function connectWebSocket() {
ws = new WebSocket('ws://localhost:8000/ws');
ws.onopen = function() {
statusSpan.textContent = 'Connected';
statusSpan.style.color = 'green';
appendMessage('System', 'Connected to chat server');
};
ws.onmessage = function(event) {
try {
let data = JSON.parse(event.data);
appendMessage(data.username, data.message);
} catch (e) {
appendMessage('Unknown', event.data);
}
};
ws.onclose = function() {
statusSpan.textContent = 'Disconnected';
statusSpan.style.color = 'red';
appendMessage('System', 'Disconnected from chat server');
// Try to reconnect after 5 seconds
setTimeout(connectWebSocket, 5000);
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
appendMessage('Error', 'Connection error occurred');
};
}
function sendMessage() {
if (!ws || ws.readyState !== WebSocket.OPEN) {
appendMessage('Error', 'WebSocket is not connected');
return;
}
let message = msgInput.value;
if (message.trim() === '') return;
let username = localStorage.getItem('username') || 'Anonymous';
let data = JSON.stringify({
username: username,
message: message
});
ws.send(data);
msgInput.value = '';
}
function appendMessage(username, message) {
let messageDiv = document.createElement('div');
messageDiv.innerHTML = ``;
chatDiv.appendChild(messageDiv);
chatDiv.scrollTop = chatDiv.scrollHeight;
}
// Set a username if not set
window.onload = function() {
if (!localStorage.getItem('username')) {
let name = prompt('Please enter your name:', 'Anonymous');
localStorage.setItem('username', name || 'Anonymous');
}
connectWebSocket();
};
// Allow sending message with Enter key
msgInput.addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
</script>
</body>
</html>
Now let's modify our server code to handle chat messages:
func (c *Client) ReadPump() {
defer func() {
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.Conn.SetReadLimit(1024)
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
for {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
// Parse the message to ensure it's valid JSON
var msgData map[string]interface{}
if err := json.Unmarshal(message, &msgData); err == nil {
// Broadcast the received message to all clients
c.Hub.Broadcast <- message
}
}
}
Place the HTML file in a public
directory in your project, and this simple chat application will allow multiple users to communicate in real-time!
Advanced Broadcasting Techniques
Targeted Broadcasting
Sometimes, you may want to broadcast messages to specific subsets of clients. Let's modify our Hub to support this:
// Modified Hub structure with room support
type Hub struct {
Clients map[*Client]bool
Rooms map[string]map[*Client]bool
Register chan *Client
Unregister chan *Client
Broadcast chan []byte
RoomMsg chan RoomMessage
mutex sync.Mutex
}
type RoomMessage struct {
Room string
Message []byte
}
// BroadcastToRoom sends a message to all clients in a specific room
func (h *Hub) BroadcastToRoom(room string, message []byte) {
h.RoomMsg <- RoomMessage{
Room: room,
Message: message,
}
}
// JoinRoom adds a client to a room
func (h *Hub) JoinRoom(roomName string, client *Client) {
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.Rooms[roomName]; !ok {
h.Rooms[roomName] = make(map[*Client]bool)
}
h.Rooms[roomName][client] = true
}
// LeaveRoom removes a client from a room
func (h *Hub) LeaveRoom(roomName string, client *Client) {
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.Rooms[roomName]; ok {
delete(h.Rooms[roomName], client)
// Clean up empty rooms
if len(h.Rooms[roomName]) == 0 {
delete(h.Rooms, roomName)
}
}
}
// Update Run() function to handle room messages
func (h *Hub) Run() {
for {
select {
case client := <-h.Register:
h.mutex.Lock()
h.Clients[client] = true
h.mutex.Unlock()
case client := <-h.Unregister:
h.mutex.Lock()
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
// Remove client from all rooms
for roomName := range h.Rooms {
delete(h.Rooms[roomName], client)
// Clean up empty rooms
if len(h.Rooms[roomName]) == 0 {
delete(h.Rooms, roomName)
}
}
close(client.Send)
}
h.mutex.Unlock()
case message := <-h.Broadcast:
h.mutex.Lock()
for client := range h.Clients {
select {
case client.Send <- message:
default:
close(client.Send)
delete(h.Clients, client)
}
}
h.mutex.Unlock()
case roomMessage := <-h.RoomMsg:
h.mutex.Lock()
if clients, ok := h.Rooms[roomMessage.Room]; ok {
for client := range clients {
select {
case client.Send <- roomMessage.Message:
default:
close(client.Send)
delete(h.Clients, client)
delete(clients, client)
}
}
}
h.mutex.Unlock()
}
}
}
User Authentication and Private Messages
For more advanced applications, you might want to authenticate users and allow private messaging:
type Client struct {
ID string
Username string
Conn *websocket.Conn
Send chan []byte
Hub *Hub
}
func (h *Hub) SendToUser(username string, message []byte) {
h.mutex.Lock()
defer h.mutex.Unlock()
for client := range h.Clients {
if client.Username == username {
select {
case client.Send <- message:
default:
close(client.Send)
delete(h.Clients, client)
}
}
}
}
Performance Considerations
When implementing WebSocket broadcasting for large-scale applications, consider these performance tips:
-
Message Buffering: Ensure your send buffers are large enough to handle message bursts but not too large that they consume excessive memory.
-
Connection Limits: Be aware of the maximum number of concurrent WebSocket connections your server can handle.
-
Message Size: Keep messages small and efficient. Consider compression for larger payloads.
-
Load Balancing: For applications with many users, implement a distributed WebSocket system using Redis or other message brokers.
Best Practices
-
Heartbeats: Implement ping/pong heartbeats to detect dead connections.
-
Graceful Disconnections: Handle client disconnections properly to avoid resource leaks.
-
Error Handling: Provide meaningful error messages to clients when something goes wrong.
-
Rate Limiting: Implement rate limiting to prevent abuse.
-
Reconnection Strategy: On the client side, implement exponential backoff for reconnection attempts.
Summary
In this guide, we've explored how to implement WebSocket broadcasting using the Echo framework. We covered:
- Creating a centralized Hub to manage WebSocket connections
- Implementing client handlers for reading from and writing to WebSockets
- Setting up Echo routes for WebSocket connections
- Building a simple chat application to demonstrate broadcasting
- Advanced techniques like room-based broadcasting and private messages
- Performance considerations and best practices
With these tools and techniques, you can build powerful real-time applications that leverage the full potential of WebSockets for broadcasting messages to multiple clients.
Additional Resources
Exercises
- Extend the chat application to support different chat rooms.
- Implement a system to show which users are currently online.
- Add support for private messaging between users.
- Create a notification system that broadcasts events to specific users.
- Implement a simple collaborative drawing application where multiple users can draw on the same canvas in real-time.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)