Skip to main content

RabbitMQ Shovel Plugin

Introduction

The RabbitMQ Shovel plugin is a powerful tool that provides a way to reliably move messages from a source queue to a destination queue - even across different RabbitMQ brokers. Think of it as a specialized consumer that reads messages from one location and republishes them to another.

Shovels can be used for various purposes, such as:

  • Connecting brokers across different data centers
  • Implementing fault-tolerant architectures
  • Load balancing between broker instances
  • Data migration between environments

Unlike federation, which focuses on exchange-to-exchange bindings, shovels work directly with queues, offering more control over message transfer.

How Shovels Work

A shovel works by:

  1. Connecting to a source broker/queue
  2. Consuming messages from the source
  3. Connecting to a destination broker/queue
  4. Republishing messages to the destination
  5. Acknowledging messages from the source once successfully published to destination

The beauty of shovels is that they handle all the complexities of connection management, including automatic reconnection if either broker becomes temporarily unavailable.

Enabling the Shovel Plugin

Before you can use shovels, you need to enable the plugin:

bash
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

The first command enables the core shovel functionality, while the second adds a management UI extension that allows you to monitor and manage shovels through the RabbitMQ Management Interface.

Creating a Shovel

You can create shovels in two ways:

  1. Dynamic Shovels: Created and managed at runtime via the HTTP API or management UI
  2. Static Shovels: Defined in the RabbitMQ configuration file

Let's look at both methods:

Dynamic Shovels

Dynamic shovels can be created using the management plugin or the HTTP API. Here's how to create one using the rabbitmqctl command-line tool:

bash
rabbitmqctl set_parameter shovel my_shovel \
'{"src-protocol": "amqp091",
"src-uri": "amqp://guest:[email protected]:5672",
"src-queue": "source_queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://guest:[email protected]:5672",
"dest-queue": "destination_queue"}'

This creates a shovel named my_shovel that moves messages from source_queue on the source broker to destination_queue on the destination broker.

Static Shovels

Static shovels are defined in the RabbitMQ configuration file. Here's an example configuration (for the newer config format):

erlang
shovel.definition.my_shovel.source.protocol = amqp091
shovel.definition.my_shovel.source.uris.0 = amqp://guest:[email protected]:5672
shovel.definition.my_shovel.source.queue = source_queue
shovel.definition.my_shovel.destination.protocol = amqp091
shovel.definition.my_shovel.destination.uris.0 = amqp://guest:[email protected]:5672
shovel.definition.my_shovel.destination.queue = destination_queue

For the classical format:

erlang
[
{rabbitmq_shovel,
[{shovels,
[{my_shovel,
[{source,
[{protocol, amqp091},
{uris, ["amqp://guest:[email protected]:5672"]},
{queue, <<"source_queue">>}]},
{destination,
[{protocol, amqp091},
{uris, ["amqp://guest:[email protected]:5672"]},
{queue, <<"destination_queue">>}]}
]}
]}
]}
].

Advanced Shovel Configuration Options

Shovels provide many configuration options to fine-tune their behavior:

Source Options

erlang
shovel.definition.my_shovel.source.prefetch_count = 1000        # Number of unacknowledged messages 
shovel.definition.my_shovel.source.delete_after = never # or 'queue-length', or count (e.g., 1000)
shovel.definition.my_shovel.source.ack_mode = on-confirm # or 'no-ack' or 'on-publish'

Destination Options

erlang
shovel.definition.my_shovel.destination.add_forward_headers = true  # Add headers about original message
shovel.definition.my_shovel.destination.publish_properties.delivery_mode = 2 # Make messages persistent

Practical Example: Disaster Recovery Setup

Let's create a practical example of using a shovel for disaster recovery. In this scenario, we want to ensure that all important messages are replicated to a backup data center.

First, we'll create the shovel:

bash
rabbitmqctl set_parameter shovel disaster_recovery \
'{"src-protocol": "amqp091",
"src-uri": "amqp://user:[email protected]:5672",
"src-queue": "critical_orders",
"src-prefetch-count": 100,
"src-delete-after": "never",
"dest-protocol": "amqp091",
"dest-uri": "amqp://user:[email protected]:5672",
"dest-queue": "critical_orders_backup",
"dest-add-forward-headers": true,
"dest-publish-properties": {"delivery_mode": 2}}'

Now, let's create a Node.js application that publishes to the primary queue and can fail over to the backup queue if necessary:

javascript
const amqp = require('amqplib');

// Connection details
const PRIMARY_BROKER = 'amqp://user:[email protected]:5672';
const BACKUP_BROKER = 'amqp://user:[email protected]:5672';
const PRIMARY_QUEUE = 'critical_orders';
const BACKUP_QUEUE = 'critical_orders_backup';

async function publishOrder(order) {
let connection;
let channel;

try {
// Try connecting to the primary broker
connection = await amqp.connect(PRIMARY_BROKER);
channel = await connection.createChannel();

// Ensure queue exists
await channel.assertQueue(PRIMARY_QUEUE, { durable: true });

// Publish the message
channel.sendToQueue(
PRIMARY_QUEUE,
Buffer.from(JSON.stringify(order)),
{ persistent: true }
);

console.log(`Order ${order.id} published to primary queue`);
} catch (error) {
console.error(`Failed to publish to primary: ${error.message}`);

// Failover to backup
try {
if (connection) await connection.close();

connection = await amqp.connect(BACKUP_BROKER);
channel = await connection.createChannel();

await channel.assertQueue(BACKUP_QUEUE, { durable: true });

channel.sendToQueue(
BACKUP_QUEUE,
Buffer.from(JSON.stringify(order)),
{ persistent: true }
);

console.log(`Order ${order.id} published to backup queue`);
} catch (backupError) {
console.error(`Failed to publish to backup: ${backupError.message}`);
throw backupError;
}
} finally {
if (channel) await channel.close();
if (connection) await connection.close();
}
}

// Example usage
publishOrder({
id: "ORD-12345",
customer: "Acme Inc.",
amount: 199.99,
timestamp: new Date().toISOString()
});

On the consumer side, we'd have a similar setup that tries to consume from the primary first and falls back to the backup:

javascript
const amqp = require('amqplib');

async function startConsumer() {
let connection;
let channel;

try {
// Try connecting to the primary broker
connection = await amqp.connect(PRIMARY_BROKER);
channel = await connection.createChannel();

await channel.assertQueue(PRIMARY_QUEUE, { durable: true });

console.log('Consuming from primary queue...');
channel.consume(PRIMARY_QUEUE, (msg) => {
if (msg !== null) {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order ${order.id}`);

// Process the order...

channel.ack(msg);
}
});
} catch (error) {
console.error(`Failed to connect to primary: ${error.message}`);

// Failover to backup
try {
if (connection) await connection.close();

connection = await amqp.connect(BACKUP_BROKER);
channel = await connection.createChannel();

await channel.assertQueue(BACKUP_QUEUE, { durable: true });

console.log('Consuming from backup queue...');
channel.consume(BACKUP_QUEUE, (msg) => {
if (msg !== null) {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order ${order.id} from backup`);

// Process the order...

channel.ack(msg);
}
});
} catch (backupError) {
console.error(`Failed to connect to backup: ${backupError.message}`);
throw backupError;
}
}
}

startConsumer().catch(err => {
console.error('Consumer failed completely:', err);
process.exit(1);
});

This setup ensures that messages can still be processed even if the primary data center becomes unavailable.

Monitoring Shovels

You can monitor your shovels using the RabbitMQ management UI or the HTTP API. To check the status of all shovels via the command line:

bash
rabbitmqctl list_shovels

Sample output:

name       status
my_shovel running

Possible statuses include:

  • starting: The shovel is starting up
  • running: The shovel is successfully transferring messages
  • terminated: The shovel has stopped (possibly due to delete_after)

Common Issues and Troubleshooting

Connection Issues

If your shovel isn't running, check:

  1. Network connectivity between brokers
  2. Credentials used in URIs
  3. Virtual host access permissions
  4. Firewall rules

Memory Consumption

Shovels with high prefetch counts can consume significant memory. If you notice memory issues:

  1. Reduce the prefetch count
  2. Monitor broker memory usage
  3. Consider setting smaller batch sizes

Message Loss Prevention

To ensure no messages are lost:

  1. Use ack_mode = on-confirm
  2. Make sure destination messages are marked as persistent (delivery_mode = 2)
  3. Use durable queues on both source and destination

Summary

The RabbitMQ Shovel plugin provides a reliable way to transfer messages between queues and brokers. It's an essential tool for building resilient, distributed messaging architectures that can span multiple data centers and environments.

Key takeaways:

  • Shovels move messages from a source to destination queue
  • They can connect brokers across different locations
  • Dynamic shovels can be created at runtime
  • Shovels automatically reconnect on failure
  • Configuration options allow fine-tuning of behavior

Exercises

  1. Basic Shovel Setup: Create a shovel that transfers messages between two local queues.
  2. Cross-broker Transfer: Set up a shovel that connects to a remote RabbitMQ instance.
  3. Failover System: Build a complete failover system that uses shovels to replicate messages to a backup broker.
  4. Monitoring Dashboard: Create a simple web dashboard that shows the status of your shovels.
  5. Delayed Message Transfer: Configure a shovel that adds a delay before republishing messages.

Additional Resources



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)