Skip to main content

MongoDB Python Driver

MongoDB is a popular NoSQL database that stores data in flexible, JSON-like documents. When building Python applications that need to interact with MongoDB, you'll use a database driver - a library that allows your application to connect to the database and perform operations. The official MongoDB driver for Python is called PyMongo.

Introduction to PyMongo

PyMongo is the official Python driver for MongoDB. It provides a simple and straightforward way to interact with MongoDB databases from Python applications. Whether you're building a web application, data analysis tool, or any other Python project that needs to store and retrieve data, PyMongo offers all the functionality you need.

In this tutorial, we'll cover:

  • Installing PyMongo
  • Connecting to MongoDB
  • Basic CRUD operations (Create, Read, Update, Delete)
  • Working with documents and collections
  • Advanced operations and best practices

Getting Started

Installing PyMongo

Before you can use PyMongo in your Python applications, you'll need to install it. You can do this using pip:

bash
pip install pymongo

If you need to connect to MongoDB Atlas (MongoDB's cloud service), you might also want to install the dnspython package:

bash
pip install pymongo[srv]

Connecting to MongoDB

To connect to a MongoDB database, you'll first need to create a client that connects to your MongoDB server:

python
import pymongo

# Connect to MongoDB running on localhost
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Alternatively, connect to MongoDB Atlas
# client = pymongo.MongoClient("mongodb+srv://<username>:<password>@<cluster-url>/test")

# Access a database
db = client["mydatabase"]

# Access a collection
collection = db["customers"]

# Check connection
try:
client.admin.command('ping')
print("Connected successfully!")
except Exception as e:
print(f"Connection failed: {e}")

In the code above:

  1. We create a MongoClient instance that connects to MongoDB running on localhost.
  2. We access a database called "mydatabase" (it will be created if it doesn't exist).
  3. We access a collection called "customers" within that database.
  4. We perform a ping command to verify the connection.

Basic CRUD Operations

Creating Documents

In MongoDB, data is stored in documents, which are organized into collections. Let's see how to insert documents into a collection:

python
# Insert a single document
customer = {
"name": "John Smith",
"email": "[email protected]",
"age": 30,
"address": {
"street": "123 Main St",
"city": "New York",
"state": "NY",
"zip": "10001"
},
"active": True
}

result = collection.insert_one(customer)
print(f"Inserted document with ID: {result.inserted_id}")

# Insert multiple documents
customers = [
{
"name": "Jane Doe",
"email": "[email protected]",
"age": 25,
"active": True
},
{
"name": "Mike Johnson",
"email": "[email protected]",
"age": 35,
"active": False
}
]

result = collection.insert_many(customers)
print(f"Inserted {len(result.inserted_ids)} documents")
print(f"IDs: {result.inserted_ids}")

Output:

Inserted document with ID: 64f5a7d2e131d7b8c9a1e2f3
Inserted 2 documents
IDs: [ObjectId('64f5a7d2e131d7b8c9a1e2f4'), ObjectId('64f5a7d2e131d7b8c9a1e2f5')]

Reading Documents

To retrieve documents from a collection, you can use the find and find_one methods:

python
# Find a single document
customer = collection.find_one({"name": "John Smith"})
if customer:
print(f"Found customer: {customer['name']}, Email: {customer['email']}")
else:
print("Customer not found")

# Find multiple documents
active_customers = collection.find({"active": True})
print("Active customers:")
for customer in active_customers:
print(f"- {customer['name']} ({customer['email']})")

# Find with query operators
young_customers = collection.find({"age": {"$lt": 30}})
print("\nCustomers younger than 30:")
for customer in young_customers:
print(f"- {customer['name']} (Age: {customer['age']})")

Output:

Found customer: John Smith, Email: [email protected]
Active customers:
- John Smith ([email protected])
- Jane Doe ([email protected])

Customers younger than 30:
- Jane Doe (Age: 25)

Updating Documents

To modify existing documents, you can use the update_one and update_many methods:

python
# Update a single document
result = collection.update_one(
{"name": "John Smith"},
{"$set": {"age": 31, "last_updated": pymongo.datetime.datetime.now()}}
)
print(f"Modified {result.modified_count} document(s)")

# Update multiple documents
result = collection.update_many(
{"active": True},
{"$set": {"account_type": "standard"}}
)
print(f"Modified {result.modified_count} document(s)")

# Upsert (insert if not exists)
result = collection.update_one(
{"email": "[email protected]"},
{"$set": {"name": "Robert Brown", "age": 40, "active": True}},
upsert=True
)
if result.upserted_id:
print(f"Inserted new document with ID: {result.upserted_id}")
else:
print(f"Modified {result.modified_count} document(s)")

Output:

Modified 1 document(s)
Modified 2 document(s)
Inserted new document with ID: 64f5a7d2e131d7b8c9a1e2f6

Deleting Documents

To remove documents from a collection, you can use the delete_one and delete_many methods:

python
# Delete a single document
result = collection.delete_one({"name": "Mike Johnson"})
print(f"Deleted {result.deleted_count} document(s)")

# Delete multiple documents
result = collection.delete_many({"active": False})
print(f"Deleted {result.deleted_count} document(s)")

# Delete all documents
# result = collection.delete_many({})
# print(f"Deleted {result.deleted_count} document(s)")

Output:

Deleted 1 document(s)
Deleted 0 document(s)

Working with Collections

Checking if a Collection Exists

python
collections = db.list_collection_names()
if "customers" in collections:
print("Customers collection exists")

Creating Indexes

Indexes improve query performance. Here's how to create them:

python
# Create a single field index
collection.create_index("email", unique=True)

# Create a compound index
collection.create_index([("name", pymongo.ASCENDING), ("age", pymongo.DESCENDING)])

# View indexes
indexes = collection.index_information()
print("Collection indexes:")
for index_name, index_info in indexes.items():
print(f"- {index_name}: {index_info}")

Dropping Collections and Databases

python
# Drop a collection
db.drop_collection("test_collection")

# Drop a database
# client.drop_database("test_database")

Advanced Operations

Aggregation Pipeline

The aggregation pipeline is a powerful tool for data analysis and transformation:

python
# Example: Group customers by age and count them
pipeline = [
{"$match": {"active": True}},
{"$group": {"_id": "$age", "count": {"$sum": 1}}},
{"$sort": {"count": -1}}
]

results = collection.aggregate(pipeline)
print("Customer counts by age:")
for result in results:
print(f"Age {result['_id']}: {result['count']} customer(s)")

You can create text indexes for efficient text search:

python
# Create a text index
collection.create_index([("name", "text"), ("email", "text")])

# Perform a text search
results = collection.find({"$text": {"$search": "Smith"}})
print("Search results for 'Smith':")
for result in results:
print(f"- {result['name']} ({result['email']})")

Bulk Operations

For better performance when performing multiple operations:

python
from pymongo import InsertOne, UpdateOne, DeleteOne

# Initialize bulk operations
bulk_operations = [
InsertOne({"name": "Alex Wilson", "email": "[email protected]", "age": 28}),
UpdateOne({"name": "John Smith"}, {"$set": {"age": 32}}),
DeleteOne({"name": "Jane Doe"})
]

# Execute bulk operations
result = collection.bulk_write(bulk_operations)
print(f"Bulk operation results: {result.bulk_api_result}")

Real-World Example: Building a Simple Contact Manager

Let's put everything together to build a simple contact manager application:

python
import pymongo
import datetime
from pprint import pprint

class ContactManager:
def __init__(self, connection_string="mongodb://localhost:27017/"):
self.client = pymongo.MongoClient(connection_string)
self.db = self.client["contact_manager"]
self.contacts = self.db["contacts"]

# Create indexes
self.contacts.create_index("email", unique=True)
self.contacts.create_index([("name", "text")])

def add_contact(self, name, email, phone=None, address=None, notes=None):
try:
contact = {
"name": name,
"email": email,
"phone": phone,
"address": address,
"notes": notes,
"created_at": datetime.datetime.now(),
"updated_at": datetime.datetime.now()
}
result = self.contacts.insert_one(contact)
return result.inserted_id
except pymongo.errors.DuplicateKeyError:
print(f"A contact with email {email} already exists.")
return None

def find_contact_by_email(self, email):
return self.contacts.find_one({"email": email})

def search_contacts(self, query):
return list(self.contacts.find({"$text": {"$search": query}}))

def update_contact(self, email, updates):
updates["updated_at"] = datetime.datetime.now()
result = self.contacts.update_one(
{"email": email},
{"$set": updates}
)
return result.modified_count

def delete_contact(self, email):
result = self.contacts.delete_one({"email": email})
return result.deleted_count

def list_all_contacts(self):
return list(self.contacts.find().sort("name"))

# Example usage
def main():
manager = ContactManager()

# Add some contacts
manager.add_contact(
"John Smith",
"[email protected]",
"555-123-4567",
{"street": "123 Main St", "city": "Boston", "state": "MA", "zip": "02115"},
"Met at tech conference"
)

manager.add_contact(
"Sarah Johnson",
"[email protected]",
"555-987-6543"
)

# Search for a contact
contact = manager.find_contact_by_email("[email protected]")
if contact:
print("\nFound contact:")
pprint(contact)

# Update a contact
manager.update_contact(
"[email protected]",
{"phone": "555-111-2222", "notes": "Updated contact info"}
)

# Search contacts
results = manager.search_contacts("Smith")
print(f"\nFound {len(results)} contacts matching 'Smith'")

# List all contacts
all_contacts = manager.list_all_contacts()
print(f"\nAll contacts ({len(all_contacts)}):")
for contact in all_contacts:
print(f"- {contact['name']} ({contact['email']})")

# Clean up (optional)
# manager.delete_contact("[email protected]")
# manager.delete_contact("[email protected]")

if __name__ == "__main__":
main()

This example demonstrates:

  • Creating a class to handle database operations
  • Using indexes for better performance
  • Error handling (e.g., duplicate emails)
  • Various CRUD operations in a real-world context

Best Practices

1. Connection Management

python
# Use with statement for auto-cleanup
with pymongo.MongoClient("mongodb://localhost:27017/") as client:
db = client.mydatabase
# Do operations
# Connection is automatically closed when exiting the with block

2. Error Handling

python
try:
result = collection.insert_one({"_id": 1, "name": "Test"})
print("Insert successful")

# This will cause a duplicate key error
result = collection.insert_one({"_id": 1, "name": "Another Test"})
except pymongo.errors.DuplicateKeyError:
print("Document with this ID already exists")
except pymongo.errors.ConnectionFailure:
print("Failed to connect to MongoDB")
except Exception as e:
print(f"An error occurred: {e}")

3. Projections

To limit the fields returned in query results:

python
# Return only name and email fields
customer = collection.find_one(
{"name": "John Smith"},
{"name": 1, "email": 1, "_id": 0}
)
print(customer) # Only contains name and email

4. Connection Pooling

PyMongo automatically handles connection pooling. Configure it as needed:

python
client = pymongo.MongoClient(
"mongodb://localhost:27017/",
maxPoolSize=50, # Maximum connections in pool
waitQueueTimeoutMS=2000 # Wait time if no connection is available
)

Summary

In this tutorial, we covered:

  1. Installation and connection: How to install PyMongo and connect to MongoDB databases
  2. CRUD operations: Creating, reading, updating, and deleting documents
  3. Collection operations: Managing collections and creating indexes
  4. Advanced features: Aggregation, text search, and bulk operations
  5. Real-world application: Building a contact manager
  6. Best practices: Connection management, error handling, and performance tips

PyMongo provides a robust and intuitive way to work with MongoDB in Python applications. By understanding the basics covered in this tutorial, you'll be able to build applications that efficiently store and retrieve data using MongoDB.

Additional Resources and Exercises

Additional Resources

Exercises

  1. Library Management System

    • Create a simple system to manage books in a library
    • Implement functions to add books, check out books, return books, and search for books
  2. Blog Platform Backend

    • Create collections for users, posts, and comments
    • Implement functions for user registration, posting, commenting, and retrieving posts
  3. Aggregation Challenge

    • Create a dataset of products with categories and prices
    • Use the aggregation pipeline to calculate average price per category, find the most expensive product, and group products by price range
  4. Performance Optimization

    • Create a collection with thousands of documents (you can generate random data)
    • Create appropriate indexes and compare query performance with and without indexes
  5. Data Migration Script

    • Write a script that would migrate data from one schema to another
    • For example, split a user collection into separate user and address collections

These exercises will help you gain practical experience with PyMongo and MongoDB concepts.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)