RabbitMQ Shovel Plugin
Introduction
The RabbitMQ Shovel plugin is a powerful tool that provides a way to reliably move messages from a source queue to a destination queue - even across different RabbitMQ brokers. Think of it as a specialized consumer that reads messages from one location and republishes them to another.
Shovels can be used for various purposes, such as:
- Connecting brokers across different data centers
- Implementing fault-tolerant architectures
- Load balancing between broker instances
- Data migration between environments
Unlike federation, which focuses on exchange-to-exchange bindings, shovels work directly with queues, offering more control over message transfer.
How Shovels Work
A shovel works by:
- Connecting to a source broker/queue
- Consuming messages from the source
- Connecting to a destination broker/queue
- Republishing messages to the destination
- Acknowledging messages from the source once successfully published to destination
The beauty of shovels is that they handle all the complexities of connection management, including automatic reconnection if either broker becomes temporarily unavailable.
Enabling the Shovel Plugin
Before you can use shovels, you need to enable the plugin:
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
The first command enables the core shovel functionality, while the second adds a management UI extension that allows you to monitor and manage shovels through the RabbitMQ Management Interface.
Creating a Shovel
You can create shovels in two ways:
- Dynamic Shovels: Created and managed at runtime via the HTTP API or management UI
- Static Shovels: Defined in the RabbitMQ configuration file
Let's look at both methods:
Dynamic Shovels
Dynamic shovels can be created using the management plugin or the HTTP API. Here's how to create one using the rabbitmqctl
command-line tool:
rabbitmqctl set_parameter shovel my_shovel \
'{"src-protocol": "amqp091",
"src-uri": "amqp://guest:[email protected]:5672",
"src-queue": "source_queue",
"dest-protocol": "amqp091",
"dest-uri": "amqp://guest:[email protected]:5672",
"dest-queue": "destination_queue"}'
This creates a shovel named my_shovel
that moves messages from source_queue
on the source broker to destination_queue
on the destination broker.
Static Shovels
Static shovels are defined in the RabbitMQ configuration file. Here's an example configuration (for the newer config format):
shovel.definition.my_shovel.source.protocol = amqp091
shovel.definition.my_shovel.source.uris.0 = amqp://guest:[email protected]:5672
shovel.definition.my_shovel.source.queue = source_queue
shovel.definition.my_shovel.destination.protocol = amqp091
shovel.definition.my_shovel.destination.uris.0 = amqp://guest:[email protected]:5672
shovel.definition.my_shovel.destination.queue = destination_queue
For the classical format:
[
{rabbitmq_shovel,
[{shovels,
[{my_shovel,
[{source,
[{protocol, amqp091},
{uris, ["amqp://guest:[email protected]:5672"]},
{queue, <<"source_queue">>}]},
{destination,
[{protocol, amqp091},
{uris, ["amqp://guest:[email protected]:5672"]},
{queue, <<"destination_queue">>}]}
]}
]}
]}
].
Advanced Shovel Configuration Options
Shovels provide many configuration options to fine-tune their behavior:
Source Options
shovel.definition.my_shovel.source.prefetch_count = 1000 # Number of unacknowledged messages
shovel.definition.my_shovel.source.delete_after = never # or 'queue-length', or count (e.g., 1000)
shovel.definition.my_shovel.source.ack_mode = on-confirm # or 'no-ack' or 'on-publish'
Destination Options
shovel.definition.my_shovel.destination.add_forward_headers = true # Add headers about original message
shovel.definition.my_shovel.destination.publish_properties.delivery_mode = 2 # Make messages persistent
Practical Example: Disaster Recovery Setup
Let's create a practical example of using a shovel for disaster recovery. In this scenario, we want to ensure that all important messages are replicated to a backup data center.
First, we'll create the shovel:
rabbitmqctl set_parameter shovel disaster_recovery \
'{"src-protocol": "amqp091",
"src-uri": "amqp://user:[email protected]:5672",
"src-queue": "critical_orders",
"src-prefetch-count": 100,
"src-delete-after": "never",
"dest-protocol": "amqp091",
"dest-uri": "amqp://user:[email protected]:5672",
"dest-queue": "critical_orders_backup",
"dest-add-forward-headers": true,
"dest-publish-properties": {"delivery_mode": 2}}'
Now, let's create a Node.js application that publishes to the primary queue and can fail over to the backup queue if necessary:
const amqp = require('amqplib');
// Connection details
const PRIMARY_BROKER = 'amqp://user:[email protected]:5672';
const BACKUP_BROKER = 'amqp://user:[email protected]:5672';
const PRIMARY_QUEUE = 'critical_orders';
const BACKUP_QUEUE = 'critical_orders_backup';
async function publishOrder(order) {
let connection;
let channel;
try {
// Try connecting to the primary broker
connection = await amqp.connect(PRIMARY_BROKER);
channel = await connection.createChannel();
// Ensure queue exists
await channel.assertQueue(PRIMARY_QUEUE, { durable: true });
// Publish the message
channel.sendToQueue(
PRIMARY_QUEUE,
Buffer.from(JSON.stringify(order)),
{ persistent: true }
);
console.log(`Order ${order.id} published to primary queue`);
} catch (error) {
console.error(`Failed to publish to primary: ${error.message}`);
// Failover to backup
try {
if (connection) await connection.close();
connection = await amqp.connect(BACKUP_BROKER);
channel = await connection.createChannel();
await channel.assertQueue(BACKUP_QUEUE, { durable: true });
channel.sendToQueue(
BACKUP_QUEUE,
Buffer.from(JSON.stringify(order)),
{ persistent: true }
);
console.log(`Order ${order.id} published to backup queue`);
} catch (backupError) {
console.error(`Failed to publish to backup: ${backupError.message}`);
throw backupError;
}
} finally {
if (channel) await channel.close();
if (connection) await connection.close();
}
}
// Example usage
publishOrder({
id: "ORD-12345",
customer: "Acme Inc.",
amount: 199.99,
timestamp: new Date().toISOString()
});
On the consumer side, we'd have a similar setup that tries to consume from the primary first and falls back to the backup:
const amqp = require('amqplib');
async function startConsumer() {
let connection;
let channel;
try {
// Try connecting to the primary broker
connection = await amqp.connect(PRIMARY_BROKER);
channel = await connection.createChannel();
await channel.assertQueue(PRIMARY_QUEUE, { durable: true });
console.log('Consuming from primary queue...');
channel.consume(PRIMARY_QUEUE, (msg) => {
if (msg !== null) {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order ${order.id}`);
// Process the order...
channel.ack(msg);
}
});
} catch (error) {
console.error(`Failed to connect to primary: ${error.message}`);
// Failover to backup
try {
if (connection) await connection.close();
connection = await amqp.connect(BACKUP_BROKER);
channel = await connection.createChannel();
await channel.assertQueue(BACKUP_QUEUE, { durable: true });
console.log('Consuming from backup queue...');
channel.consume(BACKUP_QUEUE, (msg) => {
if (msg !== null) {
const order = JSON.parse(msg.content.toString());
console.log(`Processing order ${order.id} from backup`);
// Process the order...
channel.ack(msg);
}
});
} catch (backupError) {
console.error(`Failed to connect to backup: ${backupError.message}`);
throw backupError;
}
}
}
startConsumer().catch(err => {
console.error('Consumer failed completely:', err);
process.exit(1);
});
This setup ensures that messages can still be processed even if the primary data center becomes unavailable.
Monitoring Shovels
You can monitor your shovels using the RabbitMQ management UI or the HTTP API. To check the status of all shovels via the command line:
rabbitmqctl list_shovels
Sample output:
name status
my_shovel running
Possible statuses include:
starting
: The shovel is starting uprunning
: The shovel is successfully transferring messagesterminated
: The shovel has stopped (possibly due to delete_after)
Common Issues and Troubleshooting
Connection Issues
If your shovel isn't running, check:
- Network connectivity between brokers
- Credentials used in URIs
- Virtual host access permissions
- Firewall rules
Memory Consumption
Shovels with high prefetch counts can consume significant memory. If you notice memory issues:
- Reduce the prefetch count
- Monitor broker memory usage
- Consider setting smaller batch sizes
Message Loss Prevention
To ensure no messages are lost:
- Use
ack_mode = on-confirm
- Make sure destination messages are marked as persistent (
delivery_mode = 2
) - Use durable queues on both source and destination
Summary
The RabbitMQ Shovel plugin provides a reliable way to transfer messages between queues and brokers. It's an essential tool for building resilient, distributed messaging architectures that can span multiple data centers and environments.
Key takeaways:
- Shovels move messages from a source to destination queue
- They can connect brokers across different locations
- Dynamic shovels can be created at runtime
- Shovels automatically reconnect on failure
- Configuration options allow fine-tuning of behavior
Exercises
- Basic Shovel Setup: Create a shovel that transfers messages between two local queues.
- Cross-broker Transfer: Set up a shovel that connects to a remote RabbitMQ instance.
- Failover System: Build a complete failover system that uses shovels to replicate messages to a backup broker.
- Monitoring Dashboard: Create a simple web dashboard that shows the status of your shovels.
- Delayed Message Transfer: Configure a shovel that adds a delay before republishing messages.
Additional Resources
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)