Echo Command Query Responsibility
In this article, we'll explore the Command Query Responsibility Segregation (CQRS) pattern and how to implement it in Echo framework applications. CQRS is an architectural pattern that separates read operations (queries) from write operations (commands), which can lead to more maintainable and scalable applications.
Understanding CQRS
CQRS stands for Command Query Responsibility Segregation, a pattern that was introduced by Greg Young. It's an evolution of the Command-Query Separation (CQS) principle proposed by Bertrand Meyer, which states that methods should either:
- Commands: Change state but don't return data
 - Queries: Return data but don't change state
 
CQRS takes this concept further by separating the models, storage, and sometimes even the entire stack for reads and writes.
Benefits of CQRS
- Scalability: Read and write workloads can be scaled independently
 - Optimization: Read and write models can be optimized separately
 - Flexibility: Different database technologies can be used for different operations
 - Security: Permissions can be defined differently for read and write operations
 - Complexity management: Clearer separation of concerns makes complex domains more manageable
 
Implementing CQRS in Echo
Let's break down implementing CQRS in an Echo application step by step.
1. Define Your Command and Query Models
First, create separate structures for commands and queries:
// Command models (write operations)
type CreateUserCommand struct {
    Username string
    Email    string
    Password string
}
type UpdateUserCommand struct {
    ID       int
    Username string
    Email    string
}
// Query models (read operations)
type GetUserQuery struct {
    ID int
}
type ListUsersQuery struct {
    Page  int
    Limit int
}
// View models (query results)
type UserView struct {
    ID       int
    Username string
    Email    string
    JoinDate time.Time
}
2. Create Command and Query Handlers
Next, define handlers for processing these commands and queries:
// Command handlers
type CommandHandler interface {
    Handle(ctx context.Context, command interface{}) error
}
type CreateUserHandler struct {
    userRepo repository.UserRepository
}
func (h *CreateUserHandler) Handle(ctx context.Context, cmd interface{}) error {
    createCmd, ok := cmd.(*CreateUserCommand)
    if !ok {
        return errors.New("invalid command type")
    }
    // Process the command, validate data, handle business logic
    user := &domain.User{
        Username: createCmd.Username,
        Email:    createCmd.Email,
    }
    
    // Set password hash
    if err := user.SetPassword(createCmd.Password); err != nil {
        return err
    }
    
    // Save to database
    return h.userRepo.Create(ctx, user)
}
// Query handlers
type QueryHandler interface {
    Handle(ctx context.Context, query interface{}) (interface{}, error)
}
type GetUserHandler struct {
    userReadRepo repository.UserReadRepository
}
func (h *GetUserHandler) Handle(ctx context.Context, q interface{}) (interface{}, error) {
    query, ok := q.(*GetUserQuery)
    if !ok {
        return nil, errors.New("invalid query type")
    }
    
    // Fetch user from read database and map to view model
    user, err := h.userReadRepo.GetByID(ctx, query.ID)
    if err != nil {
        return nil, err
    }
    
    return &UserView{
        ID:       user.ID,
        Username: user.Username,
        Email:    user.Email,
        JoinDate: user.CreatedAt,
    }, nil
}
3. Create Dispatchers for Commands and Queries
The dispatchers route commands and queries to their appropriate handlers:
// Command dispatcher
type CommandDispatcher struct {
    handlers map[reflect.Type]CommandHandler
}
func NewCommandDispatcher() *CommandDispatcher {
    return &CommandDispatcher{
        handlers: make(map[reflect.Type]CommandHandler),
    }
}
func (d *CommandDispatcher) RegisterHandler(command interface{}, handler CommandHandler) {
    d.handlers[reflect.TypeOf(command)] = handler
}
func (d *CommandDispatcher) Dispatch(ctx context.Context, command interface{}) error {
    handlerType := reflect.TypeOf(command)
    handler, exists := d.handlers[handlerType]
    if !exists {
        return fmt.Errorf("no handler registered for command type %v", handlerType)
    }
    return handler.Handle(ctx, command)
}
// Similar implementation for QueryDispatcher
type QueryDispatcher struct {
    handlers map[reflect.Type]QueryHandler
}
// ... methods similar to CommandDispatcher
4. Setting Up Echo Handlers
Now let's create Echo handlers that utilize our CQRS components:
func SetupRoutes(e *echo.Echo, cmdDispatcher *CommandDispatcher, queryDispatcher *QueryDispatcher) {
    // Command endpoints
    e.POST("/users", func(c echo.Context) error {
        cmd := new(CreateUserCommand)
        if err := c.Bind(cmd); err != nil {
            return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid request"})
        }
        
        if err := cmdDispatcher.Dispatch(c.Request().Context(), cmd); err != nil {
            return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
        }
        
        return c.JSON(http.StatusCreated, map[string]string{"status": "User created"})
    })
    
    // Query endpoints
    e.GET("/users/:id", func(c echo.Context) error {
        id, err := strconv.Atoi(c.Param("id"))
        if err != nil {
            return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid user ID"})
        }
        
        query := &GetUserQuery{ID: id}
        result, err := queryDispatcher.Dispatch(c.Request().Context(), query)
        if err != nil {
            return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
        }
        
        return c.JSON(http.StatusOK, result)
    })
}
5. Wiring Everything Up
Finally, let's set up the application with our CQRS pattern:
func main() {
    // Initialize Echo
    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())
    
    // Setup repositories
    writeDB := setupWriteDatabase()
    readDB := setupReadDatabase()
    
    userWriteRepo := repository.NewUserRepository(writeDB)
    userReadRepo := repository.NewUserReadRepository(readDB)
    
    // Setup dispatchers
    cmdDispatcher := NewCommandDispatcher()
    queryDispatcher := NewQueryDispatcher()
    
    // Register handlers
    cmdDispatcher.RegisterHandler(&CreateUserCommand{}, &CreateUserHandler{userRepo: userWriteRepo})
    queryDispatcher.RegisterHandler(&GetUserQuery{}, &GetUserHandler{userReadRepo: userReadRepo})
    
    // Setup routes
    SetupRoutes(e, cmdDispatcher, queryDispatcher)
    
    // Start server
    e.Logger.Fatal(e.Start(":8080"))
}
Practical Example: User Management System
Let's implement a simple user management system with CQRS. In this example, we'll use PostgreSQL for the write database and Redis for the read database to demonstrate how CQRS can use different storage technologies.
Setting Up the Data Stores
// db/postgres.go
package db
import (
    "context"
    "database/sql"
    _ "github.com/lib/pq"
)
func SetupPostgresDB(connString string) (*sql.DB, error) {
    db, err := sql.Open("postgres", connString)
    if err != nil {
        return nil, err
    }
    
    if err = db.Ping(); err != nil {
        return nil, err
    }
    
    // Create users table if it doesn't exist
    _, err = db.Exec(`
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            username VARCHAR(100) UNIQUE NOT NULL,
            email VARCHAR(255) UNIQUE NOT NULL,
            password_hash VARCHAR(255) NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    `)
    
    return db, err
}
// db/redis.go
package db
import (
    "github.com/go-redis/redis/v8"
)
func SetupRedisClient(addr string) (*redis.Client, error) {
    client := redis.NewClient(&redis.Options{
        Addr: addr,
    })
    
    // Check connection
    ctx := context.Background()
    if err := client.Ping(ctx).Err(); err != nil {
        return nil, err
    }
    
    return client, nil
}
Repository Implementation
// repository/user_repository.go
package repository
import (
    "context"
    "database/sql"
    "encoding/json"
    "github.com/go-redis/redis/v8"
    "time"
    
    "myapp/domain"
)
// Write repository
type UserRepository struct {
    db *sql.DB
}
func NewUserRepository(db *sql.DB) *UserRepository {
    return &UserRepository{db: db}
}
func (r *UserRepository) Create(ctx context.Context, user *domain.User) error {
    query := `
        INSERT INTO users (username, email, password_hash) 
        VALUES ($1, $2, $3) RETURNING id, created_at
    `
    return r.db.QueryRowContext(
        ctx, 
        query, 
        user.Username, 
        user.Email, 
        user.PasswordHash,
    ).Scan(&user.ID, &user.CreatedAt)
}
// Read repository
type UserReadRepository struct {
    redis *redis.Client
    db    *sql.DB // Fallback for cache misses
}
func NewUserReadRepository(redis *redis.Client, db *sql.DB) *UserReadRepository {
    return &UserReadRepository{
        redis: redis,
        db:    db,
    }
}
func (r *UserReadRepository) GetByID(ctx context.Context, id int) (*domain.User, error) {
    // Try to get from cache first
    userKey := fmt.Sprintf("user:%d", id)
    userJSON, err := r.redis.Get(ctx, userKey).Result()
    
    // If found in cache, unmarshal and return
    if err == nil {
        user := &domain.User{}
        if err := json.Unmarshal([]byte(userJSON), user); err == nil {
            return user, nil
        }
    }
    
    // Cache miss, get from database
    user := &domain.User{}
    err = r.db.QueryRowContext(
        ctx,
        "SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1",
        id,
    ).Scan(&user.ID, &user.Username, &user.Email, &user.PasswordHash, &user.CreatedAt)
    
    if err != nil {
        return nil, err
    }
    
    // Update cache for future requests
    userBytes, _ := json.Marshal(user)
    r.redis.Set(ctx, userKey, userBytes, time.Hour) // Cache for 1 hour
    
    return user, nil
}
Event Synchronization Between Write and Read Models
In a real CQRS system, we need to make sure the read model is updated when the write model changes. This can be done through events:
// events/events.go
package events
import (
    "context"
    "encoding/json"
    "myapp/domain"
    "time"
    
    "github.com/go-redis/redis/v8"
)
type UserCreatedEvent struct {
    ID        int
    Username  string
    Email     string
    CreatedAt time.Time
}
type EventPublisher struct {
    redis *redis.Client
}
func NewEventPublisher(redis *redis.Client) *EventPublisher {
    return &EventPublisher{redis: redis}
}
func (p *EventPublisher) PublishUserCreated(ctx context.Context, user *domain.User) error {
    event := UserCreatedEvent{
        ID:        user.ID,
        Username:  user.Username,
        Email:     user.Email,
        CreatedAt: user.CreatedAt,
    }
    
    eventBytes, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    // Publish to Redis for subscribers
    return p.redis.Publish(ctx, "user_events", eventBytes).Err()
}
// EventHandler is responsible for updating the read model when events occur
type EventHandler struct {
    redis *redis.Client
}
func NewEventHandler(redis *redis.Client) *EventHandler {
    return &EventHandler{redis: redis}
}
func (h *EventHandler) HandleUserCreated(ctx context.Context, event *UserCreatedEvent) error {
    user := &domain.User{
        ID:        event.ID,
        Username:  event.Username,
        Email:     event.Email,
        CreatedAt: event.CreatedAt,
    }
    
    // Update the read model (Redis cache)
    userBytes, err := json.Marshal(user)
    if err != nil {
        return err
    }
    
    userKey := fmt.Sprintf("user:%d", user.ID)
    return h.redis.Set(ctx, userKey, userBytes, time.Hour).Err()
}
Extending the Command Handler to Publish Events
type CreateUserHandler struct {
    userRepo      repository.UserRepository
    eventPublisher events.EventPublisher
}
func (h *CreateUserHandler) Handle(ctx context.Context, cmd interface{}) error {
    createCmd, ok := cmd.(*CreateUserCommand)
    if !ok {
        return errors.New("invalid command type")
    }
    
    // Process the command
    user := &domain.User{
        Username: createCmd.Username,
        Email:    createCmd.Email,
    }
    
    if err := user.SetPassword(createCmd.Password); err != nil {
        return err
    }
    
    // Save to database
    if err := h.userRepo.Create(ctx, user); err != nil {
        return err
    }
    
    // Publish event to update read models
    return h.eventPublisher.PublishUserCreated(ctx, user)
}
Running the Application
With our CQRS implementation in place, let's see how to run this application:
// main.go
func main() {
    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())
    
    // Setup databases
    writeDB, err := db.SetupPostgresDB("postgres://user:pass@localhost:5432/userdb")
    if err != nil {
        e.Logger.Fatal(err)
    }
    
    redisClient, err := db.SetupRedisClient("localhost:6379")
    if err != nil {
        e.Logger.Fatal(err)
    }
    
    // Setup repositories
    userWriteRepo := repository.NewUserRepository(writeDB)
    userReadRepo := repository.NewUserReadRepository(redisClient, writeDB)
    
    // Setup event handling
    eventPublisher := events.NewEventPublisher(redisClient)
    eventHandler := events.NewEventHandler(redisClient)
    
    // Subscribe to user events
    subscriber := redisClient.Subscribe(context.Background(), "user_events")
    go func() {
        for {
            msg, err := subscriber.ReceiveMessage(context.Background())
            if err != nil {
                log.Println("Error receiving message:", err)
                continue
            }
            
            var event events.UserCreatedEvent
            if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
                log.Println("Error unmarshaling event:", err)
                continue
            }
            
            if err := eventHandler.HandleUserCreated(context.Background(), &event); err != nil {
                log.Println("Error handling event:", err)
            }
        }
    }()
    
    // Setup dispatchers
    cmdDispatcher := NewCommandDispatcher()
    queryDispatcher := NewQueryDispatcher()
    
    // Register handlers
    cmdDispatcher.RegisterHandler(&CreateUserCommand{}, &CreateUserHandler{
        userRepo:      userWriteRepo,
        eventPublisher: eventPublisher,
    })
    
    queryDispatcher.RegisterHandler(&GetUserQuery{}, &GetUserHandler{
        userReadRepo: userReadRepo,
    })
    
    // Setup routes
    SetupRoutes(e, cmdDispatcher, queryDispatcher)
    
    // Start server
    e.Logger.Fatal(e.Start(":8080"))
}
Testing Our API
Here are examples of how to use our CQRS-based API:
Create a new user (Command)
Request:
curl -X POST http://localhost:8080/users \
  -H 'Content-Type: application/json' \
  -d '{
    "username": "johndoe",
    "email": "[email protected]",
    "password": "securepassword"
  }'
Response:
{
  "status": "User created"
}
Get user information (Query)
Request:
curl http://localhost:8080/users/1
Response:
{
  "id": 1,
  "username": "johndoe",
  "email": "[email protected]",
  "joinDate": "2023-08-21T15:30:45Z"
}
When to Use CQRS in Echo Applications
CQRS is a powerful pattern, but it's not appropriate for every application. Consider using CQRS when:
- Different read and write workloads: Your application has significantly different read and write patterns or volumes
 - Complex domains: Your domain logic is complex with many business rules
 - Scalability requirements: You need to scale read and write operations independently
 - Integration with event sourcing: You're using event sourcing for your domain
 
Avoid CQRS for simple CRUD applications where the complexity overhead might not be justified.
Summary
In this tutorial, we've explored the Command Query Responsibility Segregation (CQRS) pattern and implemented it in an Echo framework application. We've:
- Separated read and write operations into commands and queries
 - Created handlers for processing commands and queries
 - Implemented dispatchers for routing operations to appropriate handlers
 - Integrated with Echo to create a complete web API
 - Used different data stores for read and write operations
 - Synchronized data between read and write models using events
 
CQRS offers significant benefits in terms of scalability, performance, and maintainability, especially for complex domain models. However, it does add complexity, so it should be used judiciously where the benefits outweigh the costs.
Additional Resources
- Original CQRS pattern by Martin Fowler
 - Event Sourcing and CQRS
 - Echo Framework Documentation
 - Domain-Driven Design principles
 
Exercises
- Extend the application to include user profile information in a separate read model
 - Implement command validation using middleware
 - Add event sourcing to track all changes to user data
 - Create a query to search users by username or email
 - Implement authentication and authorization for the API endpoints
 
By solving these exercises, you'll gain a deeper understanding of CQRS and how it can be effectively applied in Echo applications.
💡 Found a typo or mistake? Click "Edit this page" to suggest a correction. Your feedback is greatly appreciated!