Skip to main content

Echo Command Query Responsibility

In this article, we'll explore the Command Query Responsibility Segregation (CQRS) pattern and how to implement it in Echo framework applications. CQRS is an architectural pattern that separates read operations (queries) from write operations (commands), which can lead to more maintainable and scalable applications.

Understanding CQRS

CQRS stands for Command Query Responsibility Segregation, a pattern that was introduced by Greg Young. It's an evolution of the Command-Query Separation (CQS) principle proposed by Bertrand Meyer, which states that methods should either:

  • Commands: Change state but don't return data
  • Queries: Return data but don't change state

CQRS takes this concept further by separating the models, storage, and sometimes even the entire stack for reads and writes.

Benefits of CQRS

  • Scalability: Read and write workloads can be scaled independently
  • Optimization: Read and write models can be optimized separately
  • Flexibility: Different database technologies can be used for different operations
  • Security: Permissions can be defined differently for read and write operations
  • Complexity management: Clearer separation of concerns makes complex domains more manageable

Implementing CQRS in Echo

Let's break down implementing CQRS in an Echo application step by step.

1. Define Your Command and Query Models

First, create separate structures for commands and queries:

go
// Command models (write operations)
type CreateUserCommand struct {
Username string
Email string
Password string
}

type UpdateUserCommand struct {
ID int
Username string
Email string
}

// Query models (read operations)
type GetUserQuery struct {
ID int
}

type ListUsersQuery struct {
Page int
Limit int
}

// View models (query results)
type UserView struct {
ID int
Username string
Email string
JoinDate time.Time
}

2. Create Command and Query Handlers

Next, define handlers for processing these commands and queries:

go
// Command handlers
type CommandHandler interface {
Handle(ctx context.Context, command interface{}) error
}

type CreateUserHandler struct {
userRepo repository.UserRepository
}

func (h *CreateUserHandler) Handle(ctx context.Context, cmd interface{}) error {
createCmd, ok := cmd.(*CreateUserCommand)
if !ok {
return errors.New("invalid command type")
}

// Process the command, validate data, handle business logic
user := &domain.User{
Username: createCmd.Username,
Email: createCmd.Email,
}

// Set password hash
if err := user.SetPassword(createCmd.Password); err != nil {
return err
}

// Save to database
return h.userRepo.Create(ctx, user)
}

// Query handlers
type QueryHandler interface {
Handle(ctx context.Context, query interface{}) (interface{}, error)
}

type GetUserHandler struct {
userReadRepo repository.UserReadRepository
}

func (h *GetUserHandler) Handle(ctx context.Context, q interface{}) (interface{}, error) {
query, ok := q.(*GetUserQuery)
if !ok {
return nil, errors.New("invalid query type")
}

// Fetch user from read database and map to view model
user, err := h.userReadRepo.GetByID(ctx, query.ID)
if err != nil {
return nil, err
}

return &UserView{
ID: user.ID,
Username: user.Username,
Email: user.Email,
JoinDate: user.CreatedAt,
}, nil
}

3. Create Dispatchers for Commands and Queries

The dispatchers route commands and queries to their appropriate handlers:

go
// Command dispatcher
type CommandDispatcher struct {
handlers map[reflect.Type]CommandHandler
}

func NewCommandDispatcher() *CommandDispatcher {
return &CommandDispatcher{
handlers: make(map[reflect.Type]CommandHandler),
}
}

func (d *CommandDispatcher) RegisterHandler(command interface{}, handler CommandHandler) {
d.handlers[reflect.TypeOf(command)] = handler
}

func (d *CommandDispatcher) Dispatch(ctx context.Context, command interface{}) error {
handlerType := reflect.TypeOf(command)
handler, exists := d.handlers[handlerType]
if !exists {
return fmt.Errorf("no handler registered for command type %v", handlerType)
}

return handler.Handle(ctx, command)
}

// Similar implementation for QueryDispatcher
type QueryDispatcher struct {
handlers map[reflect.Type]QueryHandler
}

// ... methods similar to CommandDispatcher

4. Setting Up Echo Handlers

Now let's create Echo handlers that utilize our CQRS components:

go
func SetupRoutes(e *echo.Echo, cmdDispatcher *CommandDispatcher, queryDispatcher *QueryDispatcher) {
// Command endpoints
e.POST("/users", func(c echo.Context) error {
cmd := new(CreateUserCommand)
if err := c.Bind(cmd); err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid request"})
}

if err := cmdDispatcher.Dispatch(c.Request().Context(), cmd); err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
}

return c.JSON(http.StatusCreated, map[string]string{"status": "User created"})
})

// Query endpoints
e.GET("/users/:id", func(c echo.Context) error {
id, err := strconv.Atoi(c.Param("id"))
if err != nil {
return c.JSON(http.StatusBadRequest, map[string]string{"error": "Invalid user ID"})
}

query := &GetUserQuery{ID: id}
result, err := queryDispatcher.Dispatch(c.Request().Context(), query)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
}

return c.JSON(http.StatusOK, result)
})
}

5. Wiring Everything Up

Finally, let's set up the application with our CQRS pattern:

go
func main() {
// Initialize Echo
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())

// Setup repositories
writeDB := setupWriteDatabase()
readDB := setupReadDatabase()

userWriteRepo := repository.NewUserRepository(writeDB)
userReadRepo := repository.NewUserReadRepository(readDB)

// Setup dispatchers
cmdDispatcher := NewCommandDispatcher()
queryDispatcher := NewQueryDispatcher()

// Register handlers
cmdDispatcher.RegisterHandler(&CreateUserCommand{}, &CreateUserHandler{userRepo: userWriteRepo})
queryDispatcher.RegisterHandler(&GetUserQuery{}, &GetUserHandler{userReadRepo: userReadRepo})

// Setup routes
SetupRoutes(e, cmdDispatcher, queryDispatcher)

// Start server
e.Logger.Fatal(e.Start(":8080"))
}

Practical Example: User Management System

Let's implement a simple user management system with CQRS. In this example, we'll use PostgreSQL for the write database and Redis for the read database to demonstrate how CQRS can use different storage technologies.

Setting Up the Data Stores

go
// db/postgres.go
package db

import (
"context"
"database/sql"
_ "github.com/lib/pq"
)

func SetupPostgresDB(connString string) (*sql.DB, error) {
db, err := sql.Open("postgres", connString)
if err != nil {
return nil, err
}

if err = db.Ping(); err != nil {
return nil, err
}

// Create users table if it doesn't exist
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(100) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`)

return db, err
}

// db/redis.go
package db

import (
"github.com/go-redis/redis/v8"
)

func SetupRedisClient(addr string) (*redis.Client, error) {
client := redis.NewClient(&redis.Options{
Addr: addr,
})

// Check connection
ctx := context.Background()
if err := client.Ping(ctx).Err(); err != nil {
return nil, err
}

return client, nil
}

Repository Implementation

go
// repository/user_repository.go
package repository

import (
"context"
"database/sql"
"encoding/json"
"github.com/go-redis/redis/v8"
"time"

"myapp/domain"
)

// Write repository
type UserRepository struct {
db *sql.DB
}

func NewUserRepository(db *sql.DB) *UserRepository {
return &UserRepository{db: db}
}

func (r *UserRepository) Create(ctx context.Context, user *domain.User) error {
query := `
INSERT INTO users (username, email, password_hash)
VALUES ($1, $2, $3) RETURNING id, created_at
`
return r.db.QueryRowContext(
ctx,
query,
user.Username,
user.Email,
user.PasswordHash,
).Scan(&user.ID, &user.CreatedAt)
}

// Read repository
type UserReadRepository struct {
redis *redis.Client
db *sql.DB // Fallback for cache misses
}

func NewUserReadRepository(redis *redis.Client, db *sql.DB) *UserReadRepository {
return &UserReadRepository{
redis: redis,
db: db,
}
}

func (r *UserReadRepository) GetByID(ctx context.Context, id int) (*domain.User, error) {
// Try to get from cache first
userKey := fmt.Sprintf("user:%d", id)
userJSON, err := r.redis.Get(ctx, userKey).Result()

// If found in cache, unmarshal and return
if err == nil {
user := &domain.User{}
if err := json.Unmarshal([]byte(userJSON), user); err == nil {
return user, nil
}
}

// Cache miss, get from database
user := &domain.User{}
err = r.db.QueryRowContext(
ctx,
"SELECT id, username, email, password_hash, created_at FROM users WHERE id = $1",
id,
).Scan(&user.ID, &user.Username, &user.Email, &user.PasswordHash, &user.CreatedAt)

if err != nil {
return nil, err
}

// Update cache for future requests
userBytes, _ := json.Marshal(user)
r.redis.Set(ctx, userKey, userBytes, time.Hour) // Cache for 1 hour

return user, nil
}

Event Synchronization Between Write and Read Models

In a real CQRS system, we need to make sure the read model is updated when the write model changes. This can be done through events:

go
// events/events.go
package events

import (
"context"
"encoding/json"
"myapp/domain"
"time"

"github.com/go-redis/redis/v8"
)

type UserCreatedEvent struct {
ID int
Username string
Email string
CreatedAt time.Time
}

type EventPublisher struct {
redis *redis.Client
}

func NewEventPublisher(redis *redis.Client) *EventPublisher {
return &EventPublisher{redis: redis}
}

func (p *EventPublisher) PublishUserCreated(ctx context.Context, user *domain.User) error {
event := UserCreatedEvent{
ID: user.ID,
Username: user.Username,
Email: user.Email,
CreatedAt: user.CreatedAt,
}

eventBytes, err := json.Marshal(event)
if err != nil {
return err
}

// Publish to Redis for subscribers
return p.redis.Publish(ctx, "user_events", eventBytes).Err()
}

// EventHandler is responsible for updating the read model when events occur
type EventHandler struct {
redis *redis.Client
}

func NewEventHandler(redis *redis.Client) *EventHandler {
return &EventHandler{redis: redis}
}

func (h *EventHandler) HandleUserCreated(ctx context.Context, event *UserCreatedEvent) error {
user := &domain.User{
ID: event.ID,
Username: event.Username,
Email: event.Email,
CreatedAt: event.CreatedAt,
}

// Update the read model (Redis cache)
userBytes, err := json.Marshal(user)
if err != nil {
return err
}

userKey := fmt.Sprintf("user:%d", user.ID)
return h.redis.Set(ctx, userKey, userBytes, time.Hour).Err()
}

Extending the Command Handler to Publish Events

go
type CreateUserHandler struct {
userRepo repository.UserRepository
eventPublisher events.EventPublisher
}

func (h *CreateUserHandler) Handle(ctx context.Context, cmd interface{}) error {
createCmd, ok := cmd.(*CreateUserCommand)
if !ok {
return errors.New("invalid command type")
}

// Process the command
user := &domain.User{
Username: createCmd.Username,
Email: createCmd.Email,
}

if err := user.SetPassword(createCmd.Password); err != nil {
return err
}

// Save to database
if err := h.userRepo.Create(ctx, user); err != nil {
return err
}

// Publish event to update read models
return h.eventPublisher.PublishUserCreated(ctx, user)
}

Running the Application

With our CQRS implementation in place, let's see how to run this application:

go
// main.go
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())

// Setup databases
writeDB, err := db.SetupPostgresDB("postgres://user:pass@localhost:5432/userdb")
if err != nil {
e.Logger.Fatal(err)
}

redisClient, err := db.SetupRedisClient("localhost:6379")
if err != nil {
e.Logger.Fatal(err)
}

// Setup repositories
userWriteRepo := repository.NewUserRepository(writeDB)
userReadRepo := repository.NewUserReadRepository(redisClient, writeDB)

// Setup event handling
eventPublisher := events.NewEventPublisher(redisClient)
eventHandler := events.NewEventHandler(redisClient)

// Subscribe to user events
subscriber := redisClient.Subscribe(context.Background(), "user_events")
go func() {
for {
msg, err := subscriber.ReceiveMessage(context.Background())
if err != nil {
log.Println("Error receiving message:", err)
continue
}

var event events.UserCreatedEvent
if err := json.Unmarshal([]byte(msg.Payload), &event); err != nil {
log.Println("Error unmarshaling event:", err)
continue
}

if err := eventHandler.HandleUserCreated(context.Background(), &event); err != nil {
log.Println("Error handling event:", err)
}
}
}()

// Setup dispatchers
cmdDispatcher := NewCommandDispatcher()
queryDispatcher := NewQueryDispatcher()

// Register handlers
cmdDispatcher.RegisterHandler(&CreateUserCommand{}, &CreateUserHandler{
userRepo: userWriteRepo,
eventPublisher: eventPublisher,
})

queryDispatcher.RegisterHandler(&GetUserQuery{}, &GetUserHandler{
userReadRepo: userReadRepo,
})

// Setup routes
SetupRoutes(e, cmdDispatcher, queryDispatcher)

// Start server
e.Logger.Fatal(e.Start(":8080"))
}

Testing Our API

Here are examples of how to use our CQRS-based API:

Create a new user (Command)

Request:

bash
curl -X POST http://localhost:8080/users \
-H 'Content-Type: application/json' \
-d '{
"username": "johndoe",
"email": "[email protected]",
"password": "securepassword"
}'

Response:

json
{
"status": "User created"
}

Get user information (Query)

Request:

bash
curl http://localhost:8080/users/1

Response:

json
{
"id": 1,
"username": "johndoe",
"email": "[email protected]",
"joinDate": "2023-08-21T15:30:45Z"
}

When to Use CQRS in Echo Applications

CQRS is a powerful pattern, but it's not appropriate for every application. Consider using CQRS when:

  1. Different read and write workloads: Your application has significantly different read and write patterns or volumes
  2. Complex domains: Your domain logic is complex with many business rules
  3. Scalability requirements: You need to scale read and write operations independently
  4. Integration with event sourcing: You're using event sourcing for your domain

Avoid CQRS for simple CRUD applications where the complexity overhead might not be justified.

Summary

In this tutorial, we've explored the Command Query Responsibility Segregation (CQRS) pattern and implemented it in an Echo framework application. We've:

  • Separated read and write operations into commands and queries
  • Created handlers for processing commands and queries
  • Implemented dispatchers for routing operations to appropriate handlers
  • Integrated with Echo to create a complete web API
  • Used different data stores for read and write operations
  • Synchronized data between read and write models using events

CQRS offers significant benefits in terms of scalability, performance, and maintainability, especially for complex domain models. However, it does add complexity, so it should be used judiciously where the benefits outweigh the costs.

Additional Resources

Exercises

  1. Extend the application to include user profile information in a separate read model
  2. Implement command validation using middleware
  3. Add event sourcing to track all changes to user data
  4. Create a query to search users by username or email
  5. Implement authentication and authorization for the API endpoints

By solving these exercises, you'll gain a deeper understanding of CQRS and how it can be effectively applied in Echo applications.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)