MongoDB ETL Process
Introduction
Extract, Transform, Load (ETL) is a crucial process in data migration that involves extracting data from source systems, transforming it to fit operational needs, and loading it into the target database. When working with MongoDB, ETL processes have their own unique considerations due to MongoDB's document-oriented nature and flexible schema.
In this guide, we'll explore how to perform ETL with MongoDB, providing you with practical knowledge to migrate data efficiently between MongoDB databases or from other data sources into MongoDB.
What is ETL?
Before diving into MongoDB-specific ETL, let's understand the three key components:
- Extract: Retrieving data from one or more sources
- Transform: Converting the data into an appropriate format for the target system
- Load: Inserting the transformed data into the target database
MongoDB ETL Tools and Approaches
MongoDB ETL processes can be implemented using various tools:
- MongoDB native tools (mongodump/mongorestore, mongoexport/mongoimport)
- MongoDB Compass (GUI tool with import/export capabilities)
- Custom scripts using MongoDB drivers
- Third-party ETL tools (Talend, Informatica, etc.)
- Cloud services like MongoDB Atlas Data Lake
Let's explore these approaches in detail.
Basic MongoDB ETL Using Native Tools
Simple Export and Import
For straightforward migrations between MongoDB instances, you can use the native command-line tools:
Export Data from Source
mongoexport --uri="mongodb://source-server:27017/sourcedb" \
--collection=customers \
--out=customers.json
Import Data to Target
mongoimport --uri="mongodb://target-server:27017/targetdb" \
--collection=customers \
--file=customers.json
This approach works well for simple migrations but lacks transformation capabilities.
Full Database Migration with mongodump and mongorestore
For complete database migrations:
# Export entire database
mongodump --uri="mongodb://source-server:27017/sourcedb" --out=/backup/directory
# Import to target
mongorestore --uri="mongodb://target-server:27017/targetdb" /backup/directory/sourcedb
Custom ETL Scripts for MongoDB
For more complex transformations, custom scripts provide flexibility. Here's an example using Node.js:
const { MongoClient } = require('mongodb');
async function performETL() {
// Source and target connections
const sourceClient = new MongoClient('mongodb://source-server:27017');
const targetClient = new MongoClient('mongodb://target-server:27017');
try {
// Connect to both databases
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db('sourcedb');
const targetDb = targetClient.db('targetdb');
// Extract data from source
const customers = await sourceDb.collection('customers').find({}).toArray();
// Transform data
const transformedCustomers = customers.map(customer => {
return {
fullName: `${customer.firstName} ${customer.lastName}`,
email: customer.email,
// Combine address fields
address: {
street: customer.addressLine1,
city: customer.city,
state: customer.state,
zipCode: customer.zip
},
// Add new calculated field
customerSince: new Date(customer.createdAt),
// Remove sensitive information
// (no SSN or credit card info included)
};
});
// Load data into target
if (transformedCustomers.length > 0) {
const result = await targetDb.collection('customers').insertMany(transformedCustomers);
console.log(`${result.insertedCount} documents inserted`);
}
} catch (err) {
console.error('ETL process failed:', err);
} finally {
// Close connections
await sourceClient.close();
await targetClient.close();
}
}
performETL().catch(console.error);
This script:
- Connects to both source and target MongoDB instances
- Extracts customer data from the source
- Transforms the data by restructuring fields and calculating new values
- Loads the transformed data into the target database
Advanced ETL Techniques for MongoDB
Incremental ETL
For large databases, incremental ETL is more efficient than full migration. Here's how to implement incremental ETL:
const { MongoClient } = require('mongodb');
async function incrementalETL() {
const sourceClient = new MongoClient('mongodb://source-server:27017');
const targetClient = new MongoClient('mongodb://target-server:27017');
try {
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db('sourcedb');
const targetDb = targetClient.db('targetdb');
// Get the timestamp of the last ETL run
const etlMetadata = await targetDb.collection('etl_metadata').findOne({ process: 'customer_migration' });
const lastRunTimestamp = etlMetadata ? etlMetadata.lastRun : new Date(0);
// Extract only new or modified data since last run
const newCustomers = await sourceDb.collection('customers')
.find({ lastModified: { $gt: lastRunTimestamp } })
.toArray();
console.log(`Found ${newCustomers.length} customers modified since last run`);
// Transform and load as before
// ...transformation code similar to previous example...
// Update the last run timestamp
await targetDb.collection('etl_metadata').updateOne(
{ process: 'customer_migration' },
{ $set: { lastRun: new Date() } },
{ upsert: true }
);
} catch (err) {
console.error('Incremental ETL failed:', err);
} finally {
await sourceClient.close();
await targetClient.close();
}
}
incrementalETL().catch(console.error);
Handling Schema Evolution
MongoDB's flexible schema can lead to documents with different structures in the same collection. Here's how to handle this during ETL:
function transformCustomer(customer) {
// Create a standard output format
const transformedCustomer = {
fullName: '',
email: '',
address: {
street: '',
city: '',
state: '',
zipCode: ''
},
customerSince: new Date(),
accountStatus: 'active'
};
// Handle different schema versions
if (customer.firstName && customer.lastName) {
transformedCustomer.fullName = `${customer.firstName} ${customer.lastName}`;
} else if (customer.name) {
transformedCustomer.fullName = customer.name;
}
// Handle email field
transformedCustomer.email = customer.email || customer.emailAddress || '';
// Handle different address formats
if (customer.address && typeof customer.address === 'object') {
// New schema with address object
transformedCustomer.address = {
street: customer.address.street || '',
city: customer.address.city || '',
state: customer.address.state || '',
zipCode: customer.address.zipCode || customer.address.postalCode || ''
};
} else {
// Old schema with separate address fields
transformedCustomer.address = {
street: customer.addressLine1 || '',
city: customer.city || '',
state: customer.state || '',
zipCode: customer.zip || ''
};
}
// Handle date fields
transformedCustomer.customerSince =
customer.customerSince || customer.createdAt || new Date();
return transformedCustomer;
}
Real-World ETL Example: Migration from SQL to MongoDB
Let's look at a practical example of migrating data from a SQL database to MongoDB:
const { MongoClient } = require('mongodb');
const mysql = require('mysql2/promise');
async function sqlToMongoETL() {
// Connect to MySQL source
const mysqlConn = await mysql.createConnection({
host: 'mysql-server',
user: 'user',
password: 'password',
database: 'ecommerce'
});
// Connect to MongoDB target
const mongoClient = await MongoClient.connect('mongodb://mongo-server:27017');
const mongoDb = mongoClient.db('ecommerce');
try {
// Extract customers and their orders from MySQL
const [customers] = await mysqlConn.execute('SELECT * FROM customers');
for (const customer of customers) {
// Get customer's orders
const [orders] = await mysqlConn.execute(
'SELECT * FROM orders WHERE customer_id = ?',
[customer.id]
);
// Get order items for each order
for (let i = 0; i < orders.length; i++) {
const [items] = await mysqlConn.execute(
'SELECT * FROM order_items WHERE order_id = ?',
[orders[i].id]
);
orders[i].items = items;
}
// Transform to MongoDB document structure
const customerDoc = {
_id: customer.id,
name: `${customer.first_name} ${customer.last_name}`,
email: customer.email,
address: {
street: customer.address,
city: customer.city,
state: customer.state,
zipCode: customer.zip_code
},
orders: orders.map(order => ({
orderId: order.id,
orderDate: order.order_date,
status: order.status,
total: order.total_amount,
items: order.items.map(item => ({
productId: item.product_id,
quantity: item.quantity,
price: item.price
}))
}))
};
// Load to MongoDB - using upsert to handle updates
await mongoDb.collection('customers').updateOne(
{ _id: customerDoc._id },
{ $set: customerDoc },
{ upsert: true }
);
}
console.log('Migration completed successfully');
} catch (err) {
console.error('Migration failed:', err);
} finally {
await mysqlConn.end();
await mongoClient.close();
}
}
sqlToMongoETL().catch(console.error);
This script:
- Extracts normalized data from multiple SQL tables
- Transforms it into a denormalized MongoDB document structure
- Loads each customer with their orders as a single document
ETL Best Practices for MongoDB
1. Optimize for MongoDB's Document Model
When transforming data for MongoDB, embrace the document model:
- Denormalize data that's frequently accessed together
- Embed related data in a single document for efficient queries
- Use references for large datasets or when data is shared across multiple documents
2. Manage Bulk Operations Efficiently
// More efficient bulk operations
const batch = [];
const batchSize = 1000;
for (const customer of customers) {
batch.push({
replaceOne: {
filter: { _id: customer._id },
replacement: customer,
upsert: true
}
});
if (batch.length >= batchSize) {
await targetCollection.bulkWrite(batch);
batch.length = 0;
}
}
// Process any remaining documents
if (batch.length > 0) {
await targetCollection.bulkWrite(batch);
}
3. Implement Error Handling and Logging
try {
// ETL operations
} catch (err) {
console.error(`ETL error: ${err.message}`);
await logCollection.insertOne({
timestamp: new Date(),
operation: 'customer_migration',
error: err.message,
stackTrace: err.stack
});
// Depending on error, decide whether to retry or continue with other records
}
4. Monitor Performance and Resource Usage
Keep track of ETL performance metrics:
const startTime = Date.now();
// ETL process
const endTime = Date.now();
const processingTime = endTime - startTime;
await metricsCollection.insertOne({
process: 'customer_migration',
startTime: new Date(startTime),
endTime: new Date(endTime),
processingTimeMs: processingTime,
recordsProcessed: customers.length,
throughput: (customers.length / (processingTime / 1000)).toFixed(2) + ' records/sec'
});
Building a Reusable ETL Framework for MongoDB
For ongoing ETL needs, consider building a reusable framework:
class MongoETL {
constructor(sourceUri, targetUri, options = {}) {
this.sourceUri = sourceUri;
this.targetUri = targetUri;
this.batchSize = options.batchSize || 1000;
this.transformers = {};
}
// Register a transformer for a specific collection
registerTransformer(collection, transformFn) {
this.transformers[collection] = transformFn;
}
// Process a collection
async processCollection(sourceCollection, targetCollection) {
const sourceClient = new MongoClient(this.sourceUri);
const targetClient = new MongoClient(this.targetUri);
try {
await sourceClient.connect();
await targetClient.connect();
const sourceDb = sourceClient.db();
const targetDb = targetClient.db();
const source = sourceDb.collection(sourceCollection);
const target = targetDb.collection(targetCollection);
// Get transformer or use identity function
const transform = this.transformers[sourceCollection] || (doc => doc);
// Process in batches
const cursor = source.find({});
let batch = [];
while (await cursor.hasNext()) {
const doc = await cursor.next();
const transformedDoc = transform(doc);
batch.push({
replaceOne: {
filter: { _id: transformedDoc._id },
replacement: transformedDoc,
upsert: true
}
});
if (batch.length >= this.batchSize) {
await target.bulkWrite(batch);
batch = [];
}
}
if (batch.length > 0) {
await target.bulkWrite(batch);
}
console.log(`Completed ETL for ${sourceCollection} to ${targetCollection}`);
} finally {
await sourceClient.close();
await targetClient.close();
}
}
}
// Usage example
async function runCustomerETL() {
const etl = new MongoETL(
'mongodb://source:27017/sourcedb',
'mongodb://target:27017/targetdb',
{ batchSize: 500 }
);
// Register transformer
etl.registerTransformer('customers', (customer) => {
return {
_id: customer._id,
fullName: `${customer.firstName} ${customer.lastName}`,
// other transformations
};
});
// Run ETL process
await etl.processCollection('customers', 'customers');
}
runCustomerETL().catch(console.error);
Summary
MongoDB ETL processes are essential for efficient data migration and integration. We've covered:
- Basic ETL concepts and their application to MongoDB
- Tools and techniques for MongoDB data migration
- Custom ETL scripts for complex transformations
- Advanced ETL patterns such as incremental loading and schema evolution
- Real-world examples including SQL to MongoDB migration
- Best practices for optimizing MongoDB ETL processes
- Building reusable ETL frameworks for ongoing data integration needs
By understanding these concepts and techniques, you can implement efficient and reliable ETL processes for your MongoDB data migration projects.
Additional Resources and Exercises
Resources
- MongoDB Documentation on Data Migration
- MongoDB University - Data Modeling Courses
- MongoDB Compass - Data Import/Export Features
Practice Exercises
-
Basic Exercise: Create a simple ETL script to migrate data between two MongoDB collections, transforming at least three fields during the process.
-
Intermediate Exercise: Implement an incremental ETL process that tracks which documents have been processed and only migrates new or modified data.
-
Advanced Exercise: Build a complete ETL pipeline that migrates data from a CSV file or SQL database into MongoDB, with proper error handling, logging, and performance metrics.
-
Challenge: Extend your ETL framework to handle multiple collections with different transformations, parallelizing the work for better performance.
By working through these exercises, you'll gain practical experience with MongoDB ETL processes and be well-prepared for real-world data migration tasks.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)