Skip to main content

PyTorch Horovod Integration

Introduction

Distributed training is becoming increasingly important as deep learning models grow larger and datasets become more extensive. In this tutorial, we'll explore how to integrate Horovod with PyTorch, a popular combination for efficient distributed training.

Horovod is an open-source distributed deep learning training framework developed by Uber. It was designed to make distributed training easy to use and highly efficient, especially for PyTorch and TensorFlow users. Horovod uses the Message Passing Interface (MPI) standard to coordinate work between different processes and leverages efficient communication libraries like NCCL for GPU-to-GPU communication.

By the end of this tutorial, you'll understand how Horovod works with PyTorch and be able to implement distributed training for your own deep learning models.

Prerequisites

Before we begin, make sure you have:

  • Basic knowledge of PyTorch
  • PyTorch installed
  • Horovod installed (we'll cover installation steps)
  • Multiple GPUs (optional but recommended for testing)

Installing Horovod

Let's start by installing Horovod and its dependencies:

bash
# Install MPI
# For Ubuntu:
sudo apt-get install openmpi-bin openmpi-common libopenmpi-dev

# Install Horovod with PyTorch support
pip install horovod[pytorch]

To verify your installation, run:

bash
horovodrun --check-build

You should see output indicating that PyTorch support is enabled.

Understanding Horovod's Key Concepts

Before diving into code, let's understand some key Horovod concepts:

  1. Rank: The unique ID assigned to each process in the distributed training setup
  2. Size: Total number of processes
  3. Local Rank: The relative rank of the process on the local machine
  4. Allreduce: The core operation in Horovod that averages gradients across all processes

Basic Horovod Integration with PyTorch

Let's start with a simple example to see how Horovod integrates with PyTorch:

python
import torch
import horovod.torch as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())

# Define model, loss, and optimizer
model = torch.nn.Sequential(
torch.nn.Linear(784, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 10)
).cuda()

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01 * hvd.size())

# Wrap the optimizer with Horovod's Distributed Optimizer
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters()
)

# Broadcast parameters from rank 0 to all other processes
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

This code snippet demonstrates the key integration points:

  1. Initialize Horovod with hvd.init()
  2. Set each process to use a different GPU with torch.cuda.set_device(hvd.local_rank())
  3. Scale the learning rate by the number of processes
  4. Wrap PyTorch's optimizer with Horovod's distributed optimizer
  5. Broadcast the model parameters and optimizer state from the root process to all others

Complete PyTorch Training Example with Horovod

Now, let's create a complete training example using MNIST:

python
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import horovod.torch as hvd
from torchvision import datasets, transforms

# Initialize Horovod
hvd.init()

# Pin GPU to local rank
torch.cuda.set_device(hvd.local_rank())

# Define dataset and dataloader
train_dataset = datasets.MNIST(
'data-mnist',
train=True,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
)

# Partition dataset among workers using DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=64, sampler=train_sampler)

# Simple CNN model
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output

model = Net().cuda()

# Scale learning rate by the number of workers
optimizer = optim.SGD(model.parameters(), lr=0.01 * hvd.size(), momentum=0.5)

# Horovod: wrap optimizer with DistributedOptimizer
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters())

# Broadcast parameters from rank 0 to all other processes
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# Training function
def train(epoch):
model.train()
# Set the sampler epoch for proper shuffling
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()

if batch_idx % 10 == 0 and hvd.rank() == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_sampler)} '
f'({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

# Train for 4 epochs
for epoch in range(1, 5):
train(epoch)

# Save model on root process
if hvd.rank() == 0:
torch.save(model.state_dict(), "mnist_cnn.pt")
print("Model saved")

When you run this code with Horovod, each process will:

  1. Load a different portion of the training data
  2. Compute forward and backward passes independently
  3. Average the gradients with all other processes
  4. Apply the averaged gradients to update the model

Running Horovod Training Jobs

To run your Horovod training script, use the horovodrun command:

bash
# Run on a single machine with 4 GPUs
horovodrun -np 4 python train_mnist_hvd.py

# Run on multiple nodes (example with 2 nodes, 4 GPUs each)
horovodrun -np 8 -H hostname1:4,hostname2:4 python train_mnist_hvd.py

The -np flag specifies the total number of processes, while the -H flag specifies the hostnames and number of processes per host.

Performance Optimization Tips

To get the best performance with Horovod and PyTorch:

  1. Batch Size: Increase batch size (per GPU) to maximize GPU utilization
  2. NCCL: Make sure you use NCCL as the communication backend (recommended for GPU training)
  3. Gradient Compression: Use Horovod's gradient compression to reduce communication overhead:
python
# Enable gradient compression
compression = hvd.Compression.fp16 # or hvd.Compression.none
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
compression=compression
)
  1. Fused Optimizers: Use Horovod's fused optimizers for better performance:
python
from horovod.torch.optimizer import FusedAdagrad

optimizer = FusedAdagrad(model.parameters(), lr=0.01 * hvd.size())

Debugging Horovod Training

When working with distributed training, debugging can be challenging. Here are some tips:

  1. Print debug information with rank labels:
python
print(f"[Rank {hvd.rank()}] Debug info: {some_variable}")
  1. Use Horovod's timeline feature to analyze performance:
python
# Set the timeline file name
hvd.timeline_start_step(0) # Start timeline collection
hvd.timeline_end_step(100) # Collect up to step 100
# Run your training

The timeline JSON file can be loaded in Chrome's tracing tool (chrome://tracing/).

Real-World Application: Distributed Image Classification

Let's extend our example to a more realistic scenario using the ImageNet dataset and ResNet model:

python
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import horovod.torch as hvd
import os

# Initialize Horovod
hvd.init()
torch.cuda.set_device(hvd.local_rank())

# Data augmentation and normalization
transform_train = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

# In a real scenario, you'd point to your ImageNet data directory
# For this example, we'll use a subset of CIFAR-10 instead
train_dataset = torchvision.datasets.CIFAR10(
root='./data', train=True, download=True, transform=transform_train)

# Partition dataset among workers
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=32, sampler=train_sampler, num_workers=4)

# Create model - using ResNet18 for demonstration
model = torchvision.models.resnet18(pretrained=False)
model.cuda()

# Configure optimizer
optimizer = optim.SGD(model.parameters(),
lr=0.1 * hvd.size(),
momentum=0.9,
weight_decay=5e-4)

# Horovod distributed optimizer
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
compression=hvd.Compression.fp16
)

# Broadcast parameters & optimizer state from rank 0
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)

# Learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=[30, 60, 80], gamma=0.1)

# Loss function
criterion = nn.CrossEntropyLoss().cuda()

# Training loop
def train(epoch):
model.train()
train_sampler.set_epoch(epoch)
train_loss = 0
correct = 0
total = 0

for batch_idx, (inputs, targets) in enumerate(train_loader):
inputs, targets = inputs.cuda(), targets.cuda()

# Forward pass
outputs = model(inputs)
loss = criterion(outputs, targets)

# Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()

# Update statistics
train_loss += loss.item()
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()

# Print progress on rank 0
if batch_idx % 20 == 0 and hvd.rank() == 0:
print(f'Epoch: {epoch} | Batch: {batch_idx}/{len(train_loader)} | '
f'Loss: {train_loss/(batch_idx+1):.3f} | '
f'Acc: {100.*correct/total:.3f}%')

# Train for 5 epochs
for epoch in range(1, 6):
train(epoch)
lr_scheduler.step()

# Save checkpoint (only on rank 0)
if hvd.rank() == 0:
state = {
'model': model.state_dict(),
'epoch': epoch,
}
os.makedirs('checkpoints', exist_ok=True)
torch.save(state, f'checkpoints/resnet_epoch_{epoch}.pt')

This example demonstrates a more complete training setup, including:

  • Data augmentation
  • Learning rate scheduling
  • Training statistics
  • Checkpoint saving (only on rank 0)
  • Gradient compression for communication efficiency

Summary

In this tutorial, we've explored how to integrate Horovod with PyTorch for distributed training:

  1. Installation and Setup: Installing Horovod and basic configuration
  2. Core Concepts: Understanding rank, size, and Horovod operations
  3. Basic Integration: Adding Horovod to existing PyTorch code
  4. Complete Examples: MNIST and a more realistic image classification task
  5. Performance Optimizations: Tips for getting better distributed training performance
  6. Debugging: Techniques to troubleshoot distributed training issues

Horovod makes it relatively straightforward to scale PyTorch training from a single GPU to multiple GPUs across multiple nodes, with minimal code changes. The key modifications involve initializing Horovod, using the distributed optimizer, broadcasting initial parameters, and properly partitioning your dataset.

Additional Resources and Exercises

Resources

Exercises

  1. Basic Exercise: Modify the MNIST example to use a different model architecture.

  2. Intermediate Exercise: Add validation during training and implement early stopping based on validation loss.

  3. Advanced Exercise: Implement mixed precision training (using PyTorch's AMP) with Horovod.

  4. Challenge: Try scaling the training across multiple nodes and analyze the speed-up compared to single-GPU training. Plot training time versus number of GPUs.

  5. Research Project: Experiment with different gradient compression techniques in Horovod and measure their impact on training speed and final model accuracy.

By working through these exercises, you'll gain a deeper understanding of how to effectively use Horovod with PyTorch for distributed deep learning training.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)