Python MongoDB
MongoDB is a popular NoSQL document database that stores data in flexible, JSON-like documents. Unlike traditional relational databases, MongoDB doesn't require a predefined schema, making it ideal for applications with evolving data requirements. In this tutorial, we'll learn how to work with MongoDB using Python's PyMongo library.
Introduction to MongoDB
MongoDB stores data in collections of documents, where each document is a set of key-value pairs. If you're familiar with Python dictionaries, you'll find MongoDB's data structure very intuitive.
Key characteristics of MongoDB:
- Document-oriented: Stores data in JSON-like BSON (Binary JSON) documents
- Schema-less: Collections don't enforce document structure
- Scalable: Designed for horizontal scaling across multiple servers
- Flexible: Easy to modify and extend document structure
Setting Up PyMongo
To work with MongoDB in Python, we'll use PyMongo, the official MongoDB driver for Python.
Installation
First, let's install PyMongo using pip:
pip install pymongo
Establishing a Connection
To connect to MongoDB, we need to create a client instance:
from pymongo import MongoClient
# Connect to MongoDB running on localhost
client = MongoClient('mongodb://localhost:27017/')
# Alternatively, connect to a remote MongoDB
# client = MongoClient('mongodb://username:password@host:port/')
# Create or access a database
db = client['my_database']
# Access a collection (like a table in SQL)
collection = db['my_collection']
If MongoDB isn't running on your machine, you can create a free MongoDB Atlas account for a cloud-hosted database.
Basic CRUD Operations
Let's explore how to perform Create, Read, Update, and Delete operations with PyMongo.
Create (Insert) Documents
# Insert a single document
post = {
"author": "Mike",
"text": "My first blog post!",
"tags": ["mongodb", "python", "pymongo"],
"date": datetime.datetime.utcnow()
}
post_id = collection.insert_one(post).inserted_id
print(f"Inserted document with ID: {post_id}")
# Insert multiple documents
new_posts = [
{
"author": "Jerry",
"text": "Another post!",
"tags": ["bulk", "insert"],
"date": datetime.datetime.utcnow()
},
{
"author": "Anna",
"text": "Learning MongoDB is fun!",
"tags": ["mongodb", "tutorial"],
"date": datetime.datetime.utcnow()
}
]
result = collection.insert_many(new_posts)
print(f"Inserted {len(result.inserted_ids)} documents")
Output:
Inserted document with ID: 64b9e5b3a9f1a8e6c4f2d1e0
Inserted 2 documents
Read (Query) Documents
# Find a single document
result = collection.find_one({"author": "Mike"})
print(f"Found document: {result}")
# Find multiple documents
for post in collection.find({"tags": "mongodb"}):
print(f"Author: {post['author']}, Text: {post['text']}")
# Count documents
count = collection.count_documents({"tags": "mongodb"})
print(f"Number of documents with 'mongodb' tag: {count}")
Output:
Found document: {'_id': ObjectId('64b9e5b3a9f1a8e6c4f2d1e0'), 'author': 'Mike', 'text': 'My first blog post!', 'tags': ['mongodb', 'python', 'pymongo'], 'date': datetime.datetime(2023, 7, 21, 14, 35, 15, 123000)}
Author: Mike, Text: My first blog post!
Author: Anna, Text: Learning MongoDB is fun!
Number of documents with 'mongodb' tag: 2
Update Documents
# Update a single document
update_result = collection.update_one(
{"author": "Mike"},
{"$set": {"text": "Updated blog post!"}}
)
print(f"Modified {update_result.modified_count} document")
# Update multiple documents
update_many_result = collection.update_many(
{"tags": "mongodb"},
{"$push": {"tags": "updated"}}
)
print(f"Modified {update_many_result.modified_count} documents")
Output:
Modified 1 document
Modified 2 documents
Delete Documents
# Delete a single document
delete_result = collection.delete_one({"author": "Jerry"})
print(f"Deleted {delete_result.deleted_count} document")
# Delete multiple documents
delete_many_result = collection.delete_many({"author": {"$in": ["Mike", "Anna"]}})
print(f"Deleted {delete_many_result.deleted_count} documents")
Output:
Deleted 1 document
Deleted 2 documents
Advanced Queries
MongoDB supports powerful querying capabilities. Let's explore some advanced query techniques.
Query Operators
# Let's add more documents for our examples
collection.insert_many([
{"name": "Product A", "price": 25, "category": "electronics", "in_stock": True},
{"name": "Product B", "price": 50, "category": "electronics", "in_stock": False},
{"name": "Product C", "price": 15, "category": "books", "in_stock": True},
{"name": "Product D", "price": 100, "category": "furniture", "in_stock": True},
{"name": "Product E", "price": 75, "category": "furniture", "in_stock": False}
])
# Greater than query
for product in collection.find({"price": {"$gt": 50}}):
print(f"{product['name']} - ${product['price']}")
# Logical AND
for product in collection.find({"category": "electronics", "in_stock": True}):
print(f"In stock electronics: {product['name']}")
# Logical OR using $or
for product in collection.find({"$or": [{"price": {"$lt": 20}}, {"price": {"$gt": 90}}]}):
print(f"Very cheap or very expensive: {product['name']} - ${product['price']}")
# IN operator
for product in collection.find({"category": {"$in": ["electronics", "books"]}}):
print(f"Electronics or books: {product['name']}")
Output:
Product D - $100
Product E - $75
In stock electronics: Product A
Very cheap or very expensive: Product C - $15
Very cheap or very expensive: Product D - $100
Electronics or books: Product A
Electronics or books: Product B
Electronics or books: Product C
Sorting and Limiting Results
# Sort by price (ascending)
for product in collection.find().sort("price", 1).limit(3):
print(f"{product['name']} - ${product['price']}")
print("\n--- Most expensive first ---")
# Sort by price (descending)
for product in collection.find().sort("price", -1).limit(2):
print(f"{product['name']} - ${product['price']}")
Output:
Product C - $15
Product A - $25
Product B - $50
--- Most expensive first ---
Product D - $100
Product E - $75
Projections
Projections allow you to return only specific fields from documents:
# Return only name and price, exclude _id
for product in collection.find({}, {"name": 1, "price": 1, "_id": 0}):
print(product)
Output:
{'name': 'Product A', 'price': 25}
{'name': 'Product B', 'price': 50}
{'name': 'Product C', 'price': 15}
{'name': 'Product D', 'price': 100}
{'name': 'Product E', 'price': 75}
Aggregation Framework
MongoDB's aggregation framework is a powerful way to process and analyze data within MongoDB.
Basic Aggregation Example
# Calculate average price by category
pipeline = [
{"$group": {"_id": "$category", "avg_price": {"$avg": "$price"}}},
{"$sort": {"avg_price": -1}}
]
for result in collection.aggregate(pipeline):
print(f"Category: {result['_id']}, Average Price: ${result['avg_price']}")
Output:
Category: furniture, Average Price: $87.5
Category: electronics, Average Price: $37.5
Category: books, Average Price: $15.0
Complex Aggregation
# Count products by category and availability status
pipeline = [
{"$group": {
"_id": {"category": "$category", "in_stock": "$in_stock"},
"count": {"$sum": 1},
"avg_price": {"$avg": "$price"}
}},
{"$sort": {"_id.category": 1, "_id.in_stock": -1}}
]
print("Products by category and availability:")
for result in collection.aggregate(pipeline):
status = "In Stock" if result["_id"]["in_stock"] else "Out of Stock"
print(f"{result['_id']['category']} ({status}): {result['count']} products, Avg: ${result['avg_price']}")
Output:
Products by category and availability:
books (In Stock): 1 products, Avg: $15.0
electronics (In Stock): 1 products, Avg: $25.0
electronics (Out of Stock): 1 products, Avg: $50.0
furniture (In Stock): 1 products, Avg: $100.0
furniture (Out of Stock): 1 products, Avg: $75.0
Real-World Application: Product Inventory System
Let's create a simple product inventory management system that demonstrates using MongoDB in a real-world scenario.
import pymongo
from pymongo import MongoClient
import datetime
import pprint
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['inventory_system']
products = db['products']
transactions = db['transactions']
# Clear previous data (for demo purposes)
products.delete_many({})
transactions.delete_many({})
# Function to add a product
def add_product(name, category, price, stock):
product = {
"name": name,
"category": category,
"price": price,
"stock": stock,
"created_at": datetime.datetime.utcnow()
}
result = products.insert_one(product)
print(f"Added product '{name}' with ID: {result.inserted_id}")
return result.inserted_id
# Function to update stock
def update_stock(product_id, quantity_change, transaction_type):
# Find the product
product = products.find_one({"_id": product_id})
if not product:
print("Product not found!")
return False
# Calculate new stock
new_stock = product["stock"] + quantity_change
if new_stock < 0:
print(f"Error: Not enough {product['name']} in stock!")
return False
# Update stock
products.update_one({"_id": product_id}, {"$set": {"stock": new_stock}})
# Record transaction
transaction = {
"product_id": product_id,
"product_name": product["name"],
"quantity_change": quantity_change,
"type": transaction_type,
"timestamp": datetime.datetime.utcnow()
}
transactions.insert_one(transaction)
print(f"Updated stock for '{product['name']}'. New stock: {new_stock}")
return True
# Function to get low stock products
def get_low_stock_products(threshold=5):
low_stock = list(products.find({"stock": {"$lt": threshold}}))
return low_stock
# Function to get transaction history for a product
def get_product_history(product_id):
return list(transactions.find({"product_id": product_id}).sort("timestamp", -1))
# Demo the inventory system
# Add products
laptop_id = add_product("Laptop", "electronics", 1200, 10)
keyboard_id = add_product("Keyboard", "accessories", 80, 20)
mouse_id = add_product("Mouse", "accessories", 50, 3)
# Update stock (sell items)
update_stock(laptop_id, -2, "sale")
update_stock(mouse_id, -2, "sale")
# Update stock (receive shipment)
update_stock(keyboard_id, 5, "purchase")
# Check low stock products
print("\nLow stock products:")
for product in get_low_stock_products():
print(f"- {product['name']}: {product['stock']} remaining")
# Check transaction history
print("\nTransaction history for Mouse:")
for transaction in get_product_history(mouse_id):
change = transaction['quantity_change']
direction = "out" if change < 0 else "in"
print(f"- {abs(change)} {direction} ({transaction['type']}) on {transaction['timestamp']}")
# Show all products with current stock
print("\nCurrent inventory:")
for product in products.find():
print(f"- {product['name']}: {product['stock']} units at ${product['price']} each")
Output:
Added product 'Laptop' with ID: 64b9e5d3a9f1a8e6c4f2d1e1
Added product 'Keyboard' with ID: 64b9e5d3a9f1a8e6c4f2d1e2
Added product 'Mouse' with ID: 64b9e5d3a9f1a8e6c4f2d1e3
Updated stock for 'Laptop'. New stock: 8
Updated stock for 'Mouse'. New stock: 1
Updated stock for 'Keyboard'. New stock: 25
Low stock products:
- Mouse: 1 remaining
Transaction history for Mouse:
- 2 out (sale) on 2023-07-21 14:35:15.123000
Current inventory:
- Laptop: 8 units at $1200 each
- Keyboard: 25 units at $80 each
- Mouse: 1 units at $50 each
Indexes for Performance
As your collections grow, adding indexes becomes important for query performance.
# Create an index on the 'category' field
products.create_index([("category", pymongo.ASCENDING)])
# Create a compound index on multiple fields
products.create_index([
("category", pymongo.ASCENDING),
("price", pymongo.DESCENDING)
])
# List all indexes on a collection
print("Available indexes:")
for index in products.list_indexes():
print(f"- {index['name']}: {index['key']}")
Output:
Available indexes:
- _id_: SON([('_id', 1)])
- category_1: SON([('category', 1)])
- category_1_price_-1: SON([('category', 1), ('price', -1)])
Summary
In this tutorial, we've covered:
- Setting up PyMongo: Connecting to MongoDB databases and collections
- Basic CRUD operations: Creating, reading, updating, and deleting documents
- Advanced queries: Using operators, sorting, and projections
- Aggregation framework: Processing and analyzing data within MongoDB
- Real-world application: Building a simple inventory management system
- Performance optimization: Creating indexes for better query performance
MongoDB's document-oriented approach offers flexibility that traditional relational databases don't provide. It's particularly well-suited for applications with evolving schemas, large amounts of data, or those requiring horizontal scalability.
Additional Resources
- PyMongo Documentation
- MongoDB University - Free MongoDB courses
- MongoDB Atlas - Cloud-hosted MongoDB service
Exercises
- Create a simple blog application that allows creating, reading, updating, and deleting posts.
- Implement a product search feature that allows filtering by category, price range, and availability.
- Create an analytics dashboard that shows the most sold products and revenue by category using the aggregation framework.
- Implement data validation using MongoDB's schema validation feature.
- Create a function that backs up a collection to a JSON file and another function that can restore from that backup.
By practicing these exercises, you'll strengthen your understanding of MongoDB and PyMongo while building practical applications.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)