Skip to main content

Spring Cloud Stream

Introduction

Spring Cloud Stream is a framework for building highly scalable, event-driven microservices connected with shared messaging systems. It provides a flexible programming model built on established Spring concepts and best practices, including support for persistent publish-subscribe semantics, consumer groups, and stateful partitioning.

In today's microservices architecture, applications often need to communicate asynchronously. Spring Cloud Stream simplifies this process by abstracting away the complexities of working directly with message brokers such as Apache Kafka, RabbitMQ, or any other messaging platform.

Key Concepts

Before diving into implementation, let's understand the core concepts of Spring Cloud Stream:

1. Binder

A Binder provides the integration with the external messaging system. Spring Cloud Stream comes with binder implementations for:

  • Apache Kafka
  • RabbitMQ
  • Apache Pulsar
  • Google Pub/Sub
  • Solace PubSub+
  • Amazon Kinesis

2. Binding

Bindings are the connection between your application and the messaging system. The two primary bindings are:

  • Source (for sending messages)
  • Sink (for receiving messages)

3. Message

Spring Cloud Stream leverages Spring's messaging support, with messages containing:

  • Payload: The actual data being transferred
  • Headers: Metadata about the message

4. Destination

A destination refers to the broker-specific concept such as a Kafka topic or RabbitMQ exchange.

5. Consumer Group

Like in Kafka, a consumer group enables load-balancing among multiple instances of the same application.

Setting Up Spring Cloud Stream

Let's create a simple Spring Cloud Stream application that processes messages.

Step 1: Add Dependencies

First, add the necessary dependencies to your pom.xml:

xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>

<!-- For Kafka Binder -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Or if you're using Gradle, add to your build.gradle:

groovy
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
}

Step 2: Configure Properties

Add configuration to your application.properties or application.yml:

yaml
spring:
cloud:
stream:
bindings:
input:
destination: input-topic
group: input-group
output:
destination: output-topic
kafka:
binder:
brokers: localhost:9092

This configuration:

  • Creates two bindings: input and output
  • Maps them to Kafka topics input-topic and output-topic
  • Specifies a consumer group for the input binding
  • Points to your Kafka broker(s)

Creating a Basic Stream Processor

Now let's create a simple processor that receives messages, transforms them, and sends them onward.

Functional Style (Spring Cloud Stream 3.x+)

With Spring Cloud Stream 3.x and later, the preferred approach is using functional programming style:

java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import java.util.function.Function;

@SpringBootApplication
public class StreamProcessorApplication {

public static void main(String[] args) {
SpringApplication.run(StreamProcessorApplication.class, args);
}

@Bean
public Function<Message<String>, Message<String>> processMessage() {
return message -> {
// Process message
String payload = message.getPayload();
String transformedPayload = payload.toUpperCase();

// Log for demonstration
System.out.println("Received: " + payload);
System.out.println("Transformed to: " + transformedPayload);

// Return new message with transformed payload
return org.springframework.messaging.support.MessageBuilder
.withPayload(transformedPayload)
.copyHeaders(message.getHeaders())
.build();
};
}
}

And update your configuration to use this processor:

yaml
spring:
cloud:
function:
definition: processMessage
stream:
bindings:
processMessage-in-0:
destination: input-topic
group: input-group
processMessage-out-0:
destination: output-topic
kafka:
binder:
brokers: localhost:9092

Example: Real-time Order Processing

Let's build a practical example for an e-commerce application that processes orders in real-time.

Step 1: Define the Order Model

java
public class Order {
private String orderId;
private String customer;
private double amount;
private String status;

// Getters, setters, constructors omitted for brevity
}

Step 2: Create an Order Processor

java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Function;

@SpringBootApplication
public class OrderProcessingApplication {

public static void main(String[] args) {
SpringApplication.run(OrderProcessingApplication.class, args);
}

@Bean
public Function<Order, Order> processOrder() {
return order -> {
// Validate order
if (order.getAmount() <= 0) {
order.setStatus("REJECTED");
} else {
// Process the order
order.setStatus("PROCESSED");
System.out.println("Processing order: " + order.getOrderId() +
" for customer: " + order.getCustomer() +
" with amount: $" + order.getAmount());
}
return order;
};
}
}

Step 3: Add Configuration

yaml
spring:
cloud:
function:
definition: processOrder
stream:
bindings:
processOrder-in-0:
destination: new-orders
group: order-processing-service
consumer:
concurrency: 3
processOrder-out-0:
destination: processed-orders
kafka:
binder:
brokers: localhost:9092

Input/Output Example

When a new order is placed, a message like this is sent to the new-orders topic:

json
{
"orderId": "ORD-12345",
"customer": "John Doe",
"amount": 99.95,
"status": "NEW"
}

After processing, it will be available in the processed-orders topic as:

json
{
"orderId": "ORD-12345",
"customer": "John Doe",
"amount": 99.95,
"status": "PROCESSED"
}

Advanced Features

Content Type Conversion

Spring Cloud Stream automatically handles content type conversion. For example, you can receive JSON and work with Java objects:

yaml
spring:
cloud:
stream:
bindings:
processOrder-in-0:
content-type: application/json

Error Handling

For handling errors, you can configure error channels and retry policies:

yaml
spring:
cloud:
stream:
bindings:
processOrder-in-0:
consumer:
max-attempts: 3
back-off-initial-interval: 1000
back-off-max-interval: 10000
back-off-multiplier: 2.0
default-retry-able: true

Partitioning

To ensure ordering or co-location of related messages, you can use partitioning:

yaml
spring:
cloud:
stream:
bindings:
processOrder-out-0:
producer:
partition-key-expression: headers['partitionKey']
partition-count: 3
processOrder-in-0:
consumer:
partitioned: true
instance-count: 3
instance-index: 0

Real-world Use Case: Notification System

Let's look at how Spring Cloud Stream could power a notification service in a microservices architecture:

java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;

@SpringBootApplication
public class NotificationService {

public static void main(String[] args) {
SpringApplication.run(NotificationService.class, args);
}

@Bean
public Consumer<Notification> sendNotification() {
return notification -> {
switch(notification.getType()) {
case EMAIL:
sendEmail(notification);
break;
case SMS:
sendSms(notification);
break;
case PUSH:
sendPushNotification(notification);
break;
default:
System.out.println("Unknown notification type: " + notification.getType());
}
};
}

private void sendEmail(Notification notification) {
System.out.println("Sending email to " + notification.getRecipient() +
": " + notification.getMessage());
// Email sending logic
}

private void sendSms(Notification notification) {
System.out.println("Sending SMS to " + notification.getRecipient() +
": " + notification.getMessage());
// SMS sending logic
}

private void sendPushNotification(Notification notification) {
System.out.println("Sending push notification to " + notification.getRecipient() +
": " + notification.getMessage());
// Push notification logic
}

// Notification class would be defined elsewhere
}

With configuration:

yaml
spring:
cloud:
function:
definition: sendNotification
stream:
bindings:
sendNotification-in-0:
destination: notifications
group: notification-service
consumer:
concurrency: 5
kafka:
binder:
brokers: localhost:9092

This service consumes notifications from other services (like order processing, user registration, etc.) and sends them through appropriate channels without any direct coupling between services.

Summary

Spring Cloud Stream offers a powerful abstraction for building event-driven microservices. Its key benefits include:

  1. Decoupling - Services can communicate without direct knowledge of each other
  2. Flexibility - Switch between message brokers with minimal code changes
  3. Scalability - Scale producers and consumers independently
  4. Resilience - Handle temporary downtime in either producers or consumers
  5. Integration - Built on established Spring concepts

By focusing on business logic rather than messaging infrastructure details, Spring Cloud Stream allows developers to build robust, event-driven architectures with minimal boilerplate code.

Additional Resources

Exercises

  1. Create a simple message processor that filters out messages based on certain criteria.
  2. Implement an application that uses Spring Cloud Stream to connect to both Kafka and RabbitMQ.
  3. Build a real-time analytics service that processes events and aggregates statistics.
  4. Implement error handling logic that routes failed messages to a dead letter queue.
  5. Create a partitioned stream application that ensures messages for the same customer are always processed by the same application instance.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)