MongoDB Resume Tokens
When working with MongoDB Change Streams, one of the most powerful features available is the ability to resume a change stream from where it left off. This functionality is made possible through Resume Tokens — special markers that MongoDB provides to help you maintain continuity in your data streaming applications.
What are Resume Tokens?
Resume tokens are opaque tokens that uniquely identify a position in the change stream. Think of them as bookmarks that allow you to pick up exactly where you left off when your application reconnects after disconnection or failure.
Resume tokens are crucial for building resilient applications that can handle:
- Network interruptions
- Application crashes
- Scheduled maintenance
- Rolling deployments
How Resume Tokens Work
When you open a change stream, MongoDB maintains a position in the oplog (operation log) that corresponds to your current place in the stream. As you read events from the change stream, MongoDB generates resume tokens that represent this position.
Resume Token Format
A resume token is a BSON document that contains special fields MongoDB uses internally to track positions in the oplog. While you don't need to understand the internal structure to use them, it's helpful to see what they look like:
{
"_data": "825F02486800000001000000015022C3FA37244A3BAE0B847782CF4A746F6964003C5F696400645F024868119EBC1A5AA2088004"
}
This BSON document is designed to be:
- Unique for each change event
- Passed back to MongoDB to resume a stream
- Opaque (you should not try to parse or modify it)
Getting Resume Tokens
There are two primary ways to obtain resume tokens from a change stream:
1. From the Change Event Document
Each document returned from a change stream includes a _id
field that contains the resume token:
// Open a change stream
const changeStream = db.collection('inventory').watch();
// Iterate through the change stream
while (await changeStream.hasNext()) {
const changeEvent = await changeStream.next();
// Extract the resume token from the change event
const resumeToken = changeEvent._id;
console.log('Change detected:', changeEvent.operationType);
console.log('Resume token:', resumeToken);
// Store the resume token somewhere persistent
await saveResumeToken(resumeToken);
}
2. Using getResumeToken() Method
You can also get the most recent resume token directly from the change stream cursor:
const changeStream = db.collection('inventory').watch();
// Process some changes...
// Get the latest resume token
const resumeToken = changeStream.getResumeToken();
console.log('Current resume token:', resumeToken);
Resuming a Change Stream
To resume a change stream using a stored resume token:
// Retrieve the stored resume token
const storedResumeToken = await getStoredResumeToken();
// Create a new change stream starting from the stored token
const resumedChangeStream = db.collection('inventory').watch([], {
resumeAfter: storedResumeToken
});
console.log('Successfully resumed the change stream');
// Continue processing changes
while (await resumedChangeStream.hasNext()) {
const changeEvent = await resumedChangeStream.next();
// Process change event...
}
StartAfter vs. ResumeAfter
MongoDB offers two options for resuming a change stream:
- resumeAfter: Resumes after the event identified by the resume token
- startAfter: Starts after the event identified by the resume token (introduced in MongoDB 4.2)
The key difference is their behavior when the resume token refers to an event that's no longer in the oplog:
// Using resumeAfter (may fail if token has expired from oplog)
const changeStream1 = db.collection('inventory').watch([], {
resumeAfter: storedResumeToken
});
// Using startAfter (more resilient to some types of failover scenarios)
const changeStream2 = db.collection('inventory').watch([], {
startAfter: storedResumeToken
});
startAfter
is particularly useful in replica set failover scenarios, where the primary node changes.
Storing Resume Tokens
For a resilient application, you should persistently store resume tokens. Here are common approaches:
- Database Storage: Store resume tokens in a dedicated MongoDB collection:
async function saveResumeToken(token, streamName) {
await db.collection('resumeTokens').updateOne(
{ streamName },
{ $set: { token, lastUpdated: new Date() } },
{ upsert: true }
);
}
async function getStoredResumeToken(streamName) {
const tokenDoc = await db.collection('resumeTokens')
.findOne({ streamName });
return tokenDoc ? tokenDoc.token : null;
}
- File System Storage: Write resume tokens to disk:
const fs = require('fs').promises;
async function saveResumeToken(token, filename = 'resumeToken.json') {
await fs.writeFile(filename, JSON.stringify(token), 'utf8');
}
async function getStoredResumeToken(filename = 'resumeToken.json') {
try {
const data = await fs.readFile(filename, 'utf8');
return JSON.parse(data);
} catch (err) {
if (err.code === 'ENOENT') {
return null; // File doesn't exist yet
}
throw err;
}
}
Best Practices for Resume Tokens
- Store Frequently: Save resume tokens after processing each batch of events or on a regular interval.
- Store Reliably: Use durable storage with appropriate consistency guarantees.
- Handle Expiration: Resume tokens can expire if they refer to operations that have fallen off the oplog. Implement error handling to restart from the beginning if needed.
- Always Use the Latest: Always use the most recent resume token you've saved.
- Transaction Handling: Store the resume token and commit application state changes in a single transaction when possible to ensure consistency.
Real-World Example: Resilient Change Stream Processor
Here's a complete example of a resilient change stream processor that handles disconnections:
const { MongoClient } = require('mongodb');
const fs = require('fs').promises;
// Configuration
const mongoUri = 'mongodb://localhost:27017';
const dbName = 'inventory';
const collectionName = 'products';
const resumeTokenFile = 'resumeToken.json';
// Persistent storage for resume tokens
async function saveResumeToken(token) {
await fs.writeFile(resumeTokenFile, JSON.stringify(token), 'utf8');
console.log('Resume token saved');
}
async function loadResumeToken() {
try {
const data = await fs.readFile(resumeTokenFile, 'utf8');
return JSON.parse(data);
} catch (err) {
if (err.code === 'ENOENT') {
console.log('No resume token found, starting from current position');
return null;
}
throw err;
}
}
// Main function to watch for changes
async function watchCollection() {
let client;
try {
client = new MongoClient(mongoUri);
await client.connect();
console.log('Connected to MongoDB');
const db = client.db(dbName);
const collection = db.collection(collectionName);
// Load saved resume token if exists
const resumeToken = await loadResumeToken();
// Create options for the change stream
const options = {};
if (resumeToken) {
console.log('Resuming change stream from stored token');
options.resumeAfter = resumeToken;
}
// Create and open the change stream
const changeStream = collection.watch([], options);
// Set up change stream handler
changeStream.on('change', async (change) => {
// Process the change
console.log(`Detected ${change.operationType} operation:`,
change.operationType === 'delete' ?
change.documentKey :
change.fullDocument
);
// Save the resume token after processing
await saveResumeToken(change._id);
});
// Error handling
changeStream.on('error', async (error) => {
console.error('Change stream error:', error);
if (error.code === 286) {
console.log('Resume token has expired, restarting stream from current position');
await changeStream.close();
// Clear the expired resume token
await fs.unlink(resumeTokenFile).catch(() => {});
// Restart the watch process
setTimeout(() => watchCollection(), 1000);
}
});
console.log('Change stream established, waiting for operations...');
// Keep the process running
process.on('SIGINT', async () => {
console.log('Closing change stream and connection');
await changeStream.close();
await client.close();
process.exit(0);
});
} catch (error) {
console.error('Error in watchCollection:', error);
if (client) {
await client.close();
}
// Implement retry with backoff
console.log('Retrying connection in 5 seconds...');
setTimeout(() => watchCollection(), 5000);
}
}
// Start watching
watchCollection().catch(console.error);
Common Challenges and Solutions
1. Expired Resume Tokens
Problem: If the oplog entry referenced by a resume token is no longer available (due to the oplog size limitation), attempting to resume will fail.
Solution: Catch the error (code 286) and restart the stream from the current time:
try {
const changeStream = collection.watch([], { resumeAfter: storedToken });
// Process changes...
} catch (error) {
if (error.code === 286) {
console.log('Resume token expired, starting from current position');
const freshChangeStream = collection.watch();
// Process with new stream...
} else {
throw error;
}
}
2. Multiple Consumers
Problem: When multiple applications need to process the same change stream.
Solution: Each consumer should maintain its own resume token:
// Consumer A
const changeStreamA = collection.watch([],
{ resumeAfter: getResumeTokenForConsumerA() });
// Consumer B (independent)
const changeStreamB = collection.watch([],
{ resumeAfter: getResumeTokenForConsumerB() });
3. Filtering and Resume Tokens
Problem: You need to filter the change stream but also resume properly.
Solution: Apply the same pipeline when resuming:
// Original stream with filter
const pipeline = [{ $match: { 'fullDocument.category': 'electronics' } }];
const changeStream = collection.watch(pipeline);
// Later, resume with same pipeline and token
const resumedStream = collection.watch(pipeline,
{ resumeAfter: storedToken });
Limitations of Resume Tokens
- Limited Retention: Resume tokens are only valid as long as the operation they reference is in the oplog.
- Opaque Structure: You can't create or modify tokens manually; they must be obtained from a change stream.
- Version Compatibility: Resume tokens may not be compatible between different MongoDB versions.
Summary
MongoDB Resume Tokens provide a powerful way to build resilient change stream applications. They act as bookmarks in the oplog, allowing your application to pick up exactly where it left off after disruptions.
Key points to remember:
- Resume tokens are obtained from change events via the
_id
field or usinggetResumeToken()
- Use
resumeAfter
orstartAfter
options to resume a change stream - Store resume tokens in durable storage after processing events
- Handle expired token errors by falling back to a fresh stream
- Implement proper error handling and reconnection logic for production systems
By correctly implementing resume token persistence and recovery, you can build MongoDB change stream applications that are resilient to network failures, application crashes, and maintenance operations.
Further Learning
To deepen your understanding of MongoDB Change Streams and Resume Tokens:
- Try implementing a change stream processor that handles different types of operations (insert, update, delete, etc.)
- Build a distributed system with multiple consumers that process a single change stream
- Experiment with different pipeline stages and see how they interact with resume tokens
- Implement a robust error handling system for change streams in a production environment
- Create a change stream processor that can handle database migrations and schema changes
Happy streaming!
If you spot any mistakes on this website, please let me know at feedback@compilenrun.com. I’d greatly appreciate your feedback! :)