MongoDB Python Driver
MongoDB is a popular NoSQL database that stores data in flexible, JSON-like documents. When building Python applications that need to interact with MongoDB, you'll use a database driver - a library that allows your application to connect to the database and perform operations. The official MongoDB driver for Python is called PyMongo.
Introduction to PyMongo
PyMongo is the official Python driver for MongoDB. It provides a simple and straightforward way to interact with MongoDB databases from Python applications. Whether you're building a web application, data analysis tool, or any other Python project that needs to store and retrieve data, PyMongo offers all the functionality you need.
In this tutorial, we'll cover:
- Installing PyMongo
- Connecting to MongoDB
- Basic CRUD operations (Create, Read, Update, Delete)
- Working with documents and collections
- Advanced operations and best practices
Getting Started
Installing PyMongo
Before you can use PyMongo in your Python applications, you'll need to install it. You can do this using pip:
pip install pymongo
If you need to connect to MongoDB Atlas (MongoDB's cloud service), you might also want to install the dnspython
package:
pip install pymongo[srv]
Connecting to MongoDB
To connect to a MongoDB database, you'll first need to create a client that connects to your MongoDB server:
import pymongo
# Connect to MongoDB running on localhost
client = pymongo.MongoClient("mongodb://localhost:27017/")
# Alternatively, connect to MongoDB Atlas
# client = pymongo.MongoClient("mongodb+srv://<username>:<password>@<cluster-url>/test")
# Access a database
db = client["mydatabase"]
# Access a collection
collection = db["customers"]
# Check connection
try:
client.admin.command('ping')
print("Connected successfully!")
except Exception as e:
print(f"Connection failed: {e}")
In the code above:
- We create a
MongoClient
instance that connects to MongoDB running on localhost. - We access a database called "mydatabase" (it will be created if it doesn't exist).
- We access a collection called "customers" within that database.
- We perform a ping command to verify the connection.
Basic CRUD Operations
Creating Documents
In MongoDB, data is stored in documents, which are organized into collections. Let's see how to insert documents into a collection:
# Insert a single document
customer = {
"name": "John Smith",
"email": "[email protected]",
"age": 30,
"address": {
"street": "123 Main St",
"city": "New York",
"state": "NY",
"zip": "10001"
},
"active": True
}
result = collection.insert_one(customer)
print(f"Inserted document with ID: {result.inserted_id}")
# Insert multiple documents
customers = [
{
"name": "Jane Doe",
"email": "[email protected]",
"age": 25,
"active": True
},
{
"name": "Mike Johnson",
"email": "[email protected]",
"age": 35,
"active": False
}
]
result = collection.insert_many(customers)
print(f"Inserted {len(result.inserted_ids)} documents")
print(f"IDs: {result.inserted_ids}")
Output:
Inserted document with ID: 64f5a7d2e131d7b8c9a1e2f3
Inserted 2 documents
IDs: [ObjectId('64f5a7d2e131d7b8c9a1e2f4'), ObjectId('64f5a7d2e131d7b8c9a1e2f5')]
Reading Documents
To retrieve documents from a collection, you can use the find
and find_one
methods:
# Find a single document
customer = collection.find_one({"name": "John Smith"})
if customer:
print(f"Found customer: {customer['name']}, Email: {customer['email']}")
else:
print("Customer not found")
# Find multiple documents
active_customers = collection.find({"active": True})
print("Active customers:")
for customer in active_customers:
print(f"- {customer['name']} ({customer['email']})")
# Find with query operators
young_customers = collection.find({"age": {"$lt": 30}})
print("\nCustomers younger than 30:")
for customer in young_customers:
print(f"- {customer['name']} (Age: {customer['age']})")
Output:
Found customer: John Smith, Email: [email protected]
Active customers:
- John Smith ([email protected])
- Jane Doe ([email protected])
Customers younger than 30:
- Jane Doe (Age: 25)
Updating Documents
To modify existing documents, you can use the update_one
and update_many
methods:
# Update a single document
result = collection.update_one(
{"name": "John Smith"},
{"$set": {"age": 31, "last_updated": pymongo.datetime.datetime.now()}}
)
print(f"Modified {result.modified_count} document(s)")
# Update multiple documents
result = collection.update_many(
{"active": True},
{"$set": {"account_type": "standard"}}
)
print(f"Modified {result.modified_count} document(s)")
# Upsert (insert if not exists)
result = collection.update_one(
{"email": "[email protected]"},
{"$set": {"name": "Robert Brown", "age": 40, "active": True}},
upsert=True
)
if result.upserted_id:
print(f"Inserted new document with ID: {result.upserted_id}")
else:
print(f"Modified {result.modified_count} document(s)")
Output:
Modified 1 document(s)
Modified 2 document(s)
Inserted new document with ID: 64f5a7d2e131d7b8c9a1e2f6
Deleting Documents
To remove documents from a collection, you can use the delete_one
and delete_many
methods:
# Delete a single document
result = collection.delete_one({"name": "Mike Johnson"})
print(f"Deleted {result.deleted_count} document(s)")
# Delete multiple documents
result = collection.delete_many({"active": False})
print(f"Deleted {result.deleted_count} document(s)")
# Delete all documents
# result = collection.delete_many({})
# print(f"Deleted {result.deleted_count} document(s)")
Output:
Deleted 1 document(s)
Deleted 0 document(s)
Working with Collections
Checking if a Collection Exists
collections = db.list_collection_names()
if "customers" in collections:
print("Customers collection exists")
Creating Indexes
Indexes improve query performance. Here's how to create them:
# Create a single field index
collection.create_index("email", unique=True)
# Create a compound index
collection.create_index([("name", pymongo.ASCENDING), ("age", pymongo.DESCENDING)])
# View indexes
indexes = collection.index_information()
print("Collection indexes:")
for index_name, index_info in indexes.items():
print(f"- {index_name}: {index_info}")
Dropping Collections and Databases
# Drop a collection
db.drop_collection("test_collection")
# Drop a database
# client.drop_database("test_database")
Advanced Operations
Aggregation Pipeline
The aggregation pipeline is a powerful tool for data analysis and transformation:
# Example: Group customers by age and count them
pipeline = [
{"$match": {"active": True}},
{"$group": {"_id": "$age", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]
results = collection.aggregate(pipeline)
print("Customer counts by age:")
for result in results:
print(f"Age {result['_id']}: {result['count']} customer(s)")
Text Search
You can create text indexes for efficient text search:
# Create a text index
collection.create_index([("name", "text"), ("email", "text")])
# Perform a text search
results = collection.find({"$text": {"$search": "Smith"}})
print("Search results for 'Smith':")
for result in results:
print(f"- {result['name']} ({result['email']})")
Bulk Operations
For better performance when performing multiple operations:
from pymongo import InsertOne, UpdateOne, DeleteOne
# Initialize bulk operations
bulk_operations = [
InsertOne({"name": "Alex Wilson", "email": "[email protected]", "age": 28}),
UpdateOne({"name": "John Smith"}, {"$set": {"age": 32}}),
DeleteOne({"name": "Jane Doe"})
]
# Execute bulk operations
result = collection.bulk_write(bulk_operations)
print(f"Bulk operation results: {result.bulk_api_result}")
Real-World Example: Building a Simple Contact Manager
Let's put everything together to build a simple contact manager application:
import pymongo
import datetime
from pprint import pprint
class ContactManager:
def __init__(self, connection_string="mongodb://localhost:27017/"):
self.client = pymongo.MongoClient(connection_string)
self.db = self.client["contact_manager"]
self.contacts = self.db["contacts"]
# Create indexes
self.contacts.create_index("email", unique=True)
self.contacts.create_index([("name", "text")])
def add_contact(self, name, email, phone=None, address=None, notes=None):
try:
contact = {
"name": name,
"email": email,
"phone": phone,
"address": address,
"notes": notes,
"created_at": datetime.datetime.now(),
"updated_at": datetime.datetime.now()
}
result = self.contacts.insert_one(contact)
return result.inserted_id
except pymongo.errors.DuplicateKeyError:
print(f"A contact with email {email} already exists.")
return None
def find_contact_by_email(self, email):
return self.contacts.find_one({"email": email})
def search_contacts(self, query):
return list(self.contacts.find({"$text": {"$search": query}}))
def update_contact(self, email, updates):
updates["updated_at"] = datetime.datetime.now()
result = self.contacts.update_one(
{"email": email},
{"$set": updates}
)
return result.modified_count
def delete_contact(self, email):
result = self.contacts.delete_one({"email": email})
return result.deleted_count
def list_all_contacts(self):
return list(self.contacts.find().sort("name"))
# Example usage
def main():
manager = ContactManager()
# Add some contacts
manager.add_contact(
"John Smith",
"[email protected]",
"555-123-4567",
{"street": "123 Main St", "city": "Boston", "state": "MA", "zip": "02115"},
"Met at tech conference"
)
manager.add_contact(
"Sarah Johnson",
"[email protected]",
"555-987-6543"
)
# Search for a contact
contact = manager.find_contact_by_email("[email protected]")
if contact:
print("\nFound contact:")
pprint(contact)
# Update a contact
manager.update_contact(
"[email protected]",
{"phone": "555-111-2222", "notes": "Updated contact info"}
)
# Search contacts
results = manager.search_contacts("Smith")
print(f"\nFound {len(results)} contacts matching 'Smith'")
# List all contacts
all_contacts = manager.list_all_contacts()
print(f"\nAll contacts ({len(all_contacts)}):")
for contact in all_contacts:
print(f"- {contact['name']} ({contact['email']})")
# Clean up (optional)
# manager.delete_contact("[email protected]")
# manager.delete_contact("[email protected]")
if __name__ == "__main__":
main()
This example demonstrates:
- Creating a class to handle database operations
- Using indexes for better performance
- Error handling (e.g., duplicate emails)
- Various CRUD operations in a real-world context
Best Practices
1. Connection Management
# Use with statement for auto-cleanup
with pymongo.MongoClient("mongodb://localhost:27017/") as client:
db = client.mydatabase
# Do operations
# Connection is automatically closed when exiting the with block
2. Error Handling
try:
result = collection.insert_one({"_id": 1, "name": "Test"})
print("Insert successful")
# This will cause a duplicate key error
result = collection.insert_one({"_id": 1, "name": "Another Test"})
except pymongo.errors.DuplicateKeyError:
print("Document with this ID already exists")
except pymongo.errors.ConnectionFailure:
print("Failed to connect to MongoDB")
except Exception as e:
print(f"An error occurred: {e}")
3. Projections
To limit the fields returned in query results:
# Return only name and email fields
customer = collection.find_one(
{"name": "John Smith"},
{"name": 1, "email": 1, "_id": 0}
)
print(customer) # Only contains name and email
4. Connection Pooling
PyMongo automatically handles connection pooling. Configure it as needed:
client = pymongo.MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=50, # Maximum connections in pool
waitQueueTimeoutMS=2000 # Wait time if no connection is available
)
Summary
In this tutorial, we covered:
- Installation and connection: How to install PyMongo and connect to MongoDB databases
- CRUD operations: Creating, reading, updating, and deleting documents
- Collection operations: Managing collections and creating indexes
- Advanced features: Aggregation, text search, and bulk operations
- Real-world application: Building a contact manager
- Best practices: Connection management, error handling, and performance tips
PyMongo provides a robust and intuitive way to work with MongoDB in Python applications. By understanding the basics covered in this tutorial, you'll be able to build applications that efficiently store and retrieve data using MongoDB.
Additional Resources and Exercises
Additional Resources
- Official PyMongo Documentation
- MongoDB University - Free courses on MongoDB
- MongoDB Python Driver GitHub Repository
Exercises
-
Library Management System
- Create a simple system to manage books in a library
- Implement functions to add books, check out books, return books, and search for books
-
Blog Platform Backend
- Create collections for users, posts, and comments
- Implement functions for user registration, posting, commenting, and retrieving posts
-
Aggregation Challenge
- Create a dataset of products with categories and prices
- Use the aggregation pipeline to calculate average price per category, find the most expensive product, and group products by price range
-
Performance Optimization
- Create a collection with thousands of documents (you can generate random data)
- Create appropriate indexes and compare query performance with and without indexes
-
Data Migration Script
- Write a script that would migrate data from one schema to another
- For example, split a user collection into separate user and address collections
These exercises will help you gain practical experience with PyMongo and MongoDB concepts.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)