RabbitMQ MQTT Plugin
Introduction
The MQTT plugin for RabbitMQ enables the broker to support the MQTT protocol (versions 3.1 and 3.1.1), a lightweight messaging protocol designed for Internet of Things (IoT) devices, mobile applications, and other scenarios where network bandwidth is limited. By integrating MQTT with RabbitMQ, you get the best of both worlds: the lightweight, publish/subscribe model of MQTT and the robust message queuing capabilities of RabbitMQ.
This guide will walk you through understanding, enabling, configuring, and using the RabbitMQ MQTT plugin for your applications.
What is MQTT?
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe network protocol designed for constrained devices and low-bandwidth, high-latency, or unreliable networks. Key characteristics include:
- Lightweight: Minimal packet overhead (as small as 2 bytes)
- Pub/Sub Pattern: Decouples publishers from subscribers
- QoS Levels: Three quality of service levels (0, 1, 2)
- Last Will and Testament: Messages sent when clients disconnect unexpectedly
- Retained Messages: Latest message saved for new subscribers
How MQTT and RabbitMQ Work Together
RabbitMQ's MQTT plugin creates a bridge between MQTT clients and RabbitMQ's powerful message broker infrastructure:
The MQTT plugin translates between MQTT concepts and RabbitMQ concepts:
MQTT Concept | RabbitMQ Implementation |
---|---|
Topic | Exchange + Routing Key |
QoS | Message Properties + Acknowledgements |
Retained Messages | Stored in a special exchange |
Last Will | Special message published on disconnect |
Enabling the MQTT Plugin
The MQTT plugin is included in the RabbitMQ distribution but needs to be enabled before use:
rabbitmq-plugins enable rabbitmq_mqtt
This command enables the plugin and automatically starts the MQTT listener on port 1883 (the default MQTT port).
Configuration Options
You can configure the MQTT plugin through the RabbitMQ configuration file. Here's an example of common configuration options:
mqtt.listeners.tcp.default = 1883
mqtt.listeners.ssl.default = 8883
mqtt.allow_anonymous = true
mqtt.vhost = /
mqtt.exchange = amq.topic
mqtt.subscription_ttl = 86400000
mqtt.prefetch = 10
Let's break down these configuration options:
- mqtt.listeners.tcp.default: The port for unencrypted MQTT connections
- mqtt.listeners.ssl.default: The port for SSL/TLS-encrypted MQTT connections
- mqtt.allow_anonymous: Whether to allow connections without credentials
- mqtt.vhost: The virtual host to use for MQTT connections
- mqtt.exchange: The exchange to which all MQTT messages are published
- mqtt.subscription_ttl: How long subscriptions last when clients disconnect (in milliseconds)
- mqtt.prefetch: Maximum number of unacknowledged messages delivered to clients
Authentication and Authorization
By default, the MQTT plugin uses RabbitMQ's authentication mechanism. You can configure client authentication in several ways:
Username/Password Authentication
MQTT clients can connect using a username and password that's validated against RabbitMQ's user database:
// JavaScript MQTT client example (using MQTT.js)
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883', {
username: 'guest',
password: 'guest'
});
client.on('connect', () => {
console.log('Connected to RabbitMQ MQTT broker');
});
SSL/TLS Client Certificates
For more secure authentication, you can use SSL client certificates:
# In RabbitMQ configuration
mqtt.listeners.ssl.default = 8883
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
Publishing and Subscribing with MQTT
Let's look at basic MQTT operations with RabbitMQ:
Publishing Messages
Here's a Python example using the Paho MQTT client to publish a message:
import paho.mqtt.client as mqtt
import json
import time
# Callback when client connects
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to RabbitMQ MQTT broker")
else:
print(f"Failed to connect, return code {rc}")
# Create client instance
client = mqtt.Client()
client.username_pw_set("guest", "guest") # Set username and password
client.on_connect = on_connect
# Connect to broker
client.connect("localhost", 1883, 60)
client.loop_start()
# Wait for connection to establish
time.sleep(1)
# Publish message
topic = "sensors/temperature"
payload = json.dumps({
"device_id": "thermostat-living-room",
"temperature": 22.5,
"humidity": 45,
"timestamp": time.time()
})
client.publish(topic, payload, qos=1)
print(f"Published message to {topic}")
# Disconnect
client.loop_stop()
client.disconnect()
Subscribing to Messages
Here's how to subscribe to messages:
import paho.mqtt.client as mqtt
import time
import json
# Callback when client connects
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to RabbitMQ MQTT broker")
# Subscribe to topic
client.subscribe("sensors/#", qos=1)
print("Subscribed to sensors/#")
else:
print(f"Failed to connect, return code {rc}")
# Callback when message is received
def on_message(client, userdata, msg):
print(f"Received message on topic {msg.topic}:")
try:
payload = json.loads(msg.payload.decode())
print(f"Device: {payload['device_id']}")
print(f"Temperature: {payload['temperature']}°C")
print(f"Humidity: {payload['humidity']}%")
print(f"Timestamp: {payload['timestamp']}")
except:
print(f"Raw payload: {msg.payload.decode()}")
# Create client instance
client = mqtt.Client()
client.username_pw_set("guest", "guest")
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Start loop to process callbacks
client.loop_forever()
Topic Structures and Wildcards
MQTT uses a hierarchical topic structure with support for wildcards:
- Single-level wildcard (
+
): Matches exactly one topic level - Multi-level wildcard (
#
): Matches any number of topic levels
Examples:
-
sensors/+/temperature
would match:sensors/living-room/temperature
sensors/kitchen/temperature
- But not
sensors/kitchen/upstairs/temperature
-
sensors/#
would match:sensors/living-room/temperature
sensors/kitchen/humidity
sensors/kitchen/upstairs/temperature
Quality of Service (QoS) Levels
MQTT supports three QoS levels:
- QoS 0 (At most once): Message is delivered at most once, with no confirmation
- QoS 1 (At least once): Message is delivered at least once, with confirmation
- QoS 2 (Exactly once): Message is delivered exactly once, using a 4-part handshake
# Publishing with different QoS levels
client.publish("sensors/temperature", payload, qos=0) # Fire and forget
client.publish("sensors/temperature", payload, qos=1) # At least once delivery
client.publish("sensors/temperature", payload, qos=2) # Exactly once delivery
Retained Messages
MQTT allows publishers to mark messages as "retained." A retained message is stored by the broker and sent to any new subscriber to that topic:
# Publishing a retained message
client.publish("sensors/living-room/config",
json.dumps({"sample_rate": 60, "alert_threshold": 30}),
qos=1,
retain=True)
Last Will and Testament
MQTT clients can specify a "last will" message that the broker will publish if the client disconnects unexpectedly:
# Setting up Last Will and Testament
client = mqtt.Client()
client.username_pw_set("guest", "guest")
client.will_set(
topic="devices/thermostat/status",
payload=json.dumps({"status": "offline", "device_id": "thermostat-living-room"}),
qos=1,
retain=True
)
Practical Example: IoT Monitoring System
Let's create a simple IoT monitoring system using RabbitMQ's MQTT plugin:
Device Simulator (Publisher)
import paho.mqtt.client as mqtt
import json
import time
import random
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to RabbitMQ MQTT broker")
# Publish an online status message as retained
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"status": "online"}),
qos=1,
retain=True
)
else:
print(f"Failed to connect, return code {rc}")
# Configure device
DEVICE_ID = f"sensor-{random.randint(1000, 9999)}"
LOCATION = "living-room"
# Create client
client = mqtt.Client(client_id=DEVICE_ID)
client.username_pw_set("guest", "guest")
client.on_connect = on_connect
# Set Last Will
client.will_set(
topic=f"devices/{DEVICE_ID}/status",
payload=json.dumps({"status": "offline"}),
qos=1,
retain=True
)
# Connect to broker
client.connect("localhost", 1883, 60)
client.loop_start()
# Wait for connection
time.sleep(1)
try:
# Send data regularly
while True:
# Generate simulated sensor data
temperature = 20 + 5 * random.random()
humidity = 30 + 20 * random.random()
battery = 100 - (time.time() % 10000) / 100
# Publish to different topics
client.publish(
f"sensors/{LOCATION}/temperature",
json.dumps({
"device_id": DEVICE_ID,
"value": round(temperature, 1),
"unit": "C",
"timestamp": time.time()
}),
qos=1
)
client.publish(
f"sensors/{LOCATION}/humidity",
json.dumps({
"device_id": DEVICE_ID,
"value": round(humidity, 1),
"unit": "%",
"timestamp": time.time()
}),
qos=1
)
client.publish(
f"devices/{DEVICE_ID}/battery",
json.dumps({
"level": round(battery, 1),
"unit": "%"
}),
qos=1
)
print(f"Published data: Temp={round(temperature, 1)}°C, Humidity={round(humidity, 1)}%")
time.sleep(5)
except KeyboardInterrupt:
# Clean disconnect
client.publish(
f"devices/{DEVICE_ID}/status",
json.dumps({"status": "offline"}),
qos=1,
retain=True
)
client.loop_stop()
client.disconnect()
print("Disconnected from broker")
Monitoring Application (Subscriber)
import paho.mqtt.client as mqtt
import json
from datetime import datetime
# Dictionary to store device states
devices = {}
sensor_data = {}
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to RabbitMQ MQTT broker")
# Subscribe to all device and sensor topics
client.subscribe("devices/#", qos=1)
client.subscribe("sensors/#", qos=1)
print("Monitoring system online")
else:
print(f"Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
topic = msg.topic
parts = topic.split('/')
# Format timestamp
timestamp = datetime.now().strftime("%H:%M:%S")
if parts[0] == "devices" and parts[2] == "status":
device_id = parts[1]
status = payload.get("status")
devices[device_id] = status
print(f"[{timestamp}] Device {device_id}: {status}")
# Show currently connected devices
online_count = list(devices.values()).count("online")
print(f"Active devices: {online_count}/{len(devices)}")
elif parts[0] == "devices" and parts[2] == "battery":
device_id = parts[1]
battery = payload.get("level")
print(f"[{timestamp}] Device {device_id} battery: {battery}%")
# Alert on low battery
if battery < 20:
print(f"⚠️ WARNING: Low battery on {device_id}!")
elif parts[0] == "sensors":
location = parts[1]
metric = parts[2]
value = payload.get("value")
unit = payload.get("unit")
device_id = payload.get("device_id")
# Store sensor data
if location not in sensor_data:
sensor_data[location] = {}
if metric not in sensor_data[location]:
sensor_data[location][metric] = {}
sensor_data[location][metric] = {
"value": value,
"unit": unit,
"device_id": device_id,
"updated": timestamp
}
print(f"[{timestamp}] {location} {metric}: {value}{unit}")
# Display current readings periodically
if len(sensor_data) > 0 and len(list(sensor_data.values())[0]) > 1:
print("
--- Current Readings ---")
for loc, metrics in sensor_data.items():
print(f"Location: {loc}")
for met, data in metrics.items():
print(f" {met}: {data['value']}{data['unit']} (from {data['device_id']})")
print("----------------------
")
except json.JSONDecodeError:
print(f"Received non-JSON message on {msg.topic}: {msg.payload.decode()}")
except Exception as e:
print(f"Error processing message: {e}")
# Create client
client = mqtt.Client(client_id="monitoring-system")
client.username_pw_set("guest", "guest")
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
client.connect("localhost", 1883, 60)
# Start loop
client.loop_forever()
Common Issues and Troubleshooting
Here are some common issues you might encounter when working with the RabbitMQ MQTT plugin:
Connection Problems
If your MQTT clients can't connect to RabbitMQ:
-
Check if the plugin is enabled:
bashrabbitmqctl list_plugins
-
Verify MQTT listener is running:
bashrabbitmqctl list_listeners
-
Check authentication credentials - ensure the user exists and has permissions:
bashrabbitmqctl list_users
rabbitmqctl list_user_permissions guest
Message Delivery Issues
If messages aren't being delivered correctly:
- Check subscriber topic patterns - ensure wildcards are used correctly
- Verify QoS settings - higher QoS levels add reliability but more overhead
- Inspect RabbitMQ management UI - look for queues, exchanges, and bindings
Debugging MQTT Traffic
To debug MQTT traffic:
- Enable verbose logging in your MQTT client
- Use a packet sniffer like Wireshark with the MQTT protocol filter
- Monitor RabbitMQ logs:
bash
tail -f /var/log/rabbitmq/[email protected]
Performance Considerations
To optimize performance when using the MQTT plugin:
- QoS Level Selection: Use QoS 0 for non-critical, high-volume messages
- Topic Design: Keep topic hierarchy reasonably flat to improve routing performance
- Client Connection Management: Use persistent connections rather than frequent reconnects
- Message Size: Keep payloads small (use compression if necessary)
- Subscription Patterns: Avoid overly broad wildcards that match many topics
Summary
The RabbitMQ MQTT plugin provides a powerful integration between the lightweight MQTT protocol and RabbitMQ's robust message broker. This combination is particularly valuable for IoT applications, mobile clients, and scenarios where network bandwidth is at a premium.
Key takeaways:
- MQTT is a lightweight publish/subscribe protocol ideal for constrained devices
- The RabbitMQ MQTT plugin allows MQTT clients to interact with the RabbitMQ broker
- The plugin supports core MQTT features including QoS levels, retained messages, and Last Will
- Proper configuration ensures security, reliability, and performance
Exercises
- Set up RabbitMQ with the MQTT plugin and connect a simple MQTT client.
- Create a publish/subscribe system with different QoS levels and observe the differences.
- Experiment with retained messages to provide configuration settings to new subscribers.
- Implement a simple IoT monitoring system similar to the example provided.
- Explore how to bridge MQTT with AMQP by having an AMQP consumer receive messages published by MQTT clients.
Additional Resources
- RabbitMQ MQTT Plugin Documentation
- MQTT Specification
- Eclipse Mosquitto - Popular MQTT broker for testing
- MQTT.js - JavaScript MQTT client
- Eclipse Paho - MQTT clients in various languages
- HiveMQ MQTT Client Tool - Browser-based MQTT client
For further reading on messaging patterns and distributed systems that combine MQTT with other protocols, explore the broader RabbitMQ documentation or books on message-oriented middleware.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)