Skip to main content

TensorFlow Horovod Integration

Introduction

When training complex deep learning models with large datasets, a single GPU often isn't enough. Distributed training allows you to spread your workload across multiple GPUs and even multiple machines, significantly reducing training time. Horovod is a popular distributed deep learning training framework that makes scaling your TensorFlow models simple and efficient.

Developed by Uber in 2017, Horovod implements a ring-allreduce architecture for data parallelism, inspired by Facebook's research. It provides a unified way to distribute training across multiple GPUs, multiple machines, or even multiple cloud instances without requiring significant changes to your existing TensorFlow code.

In this tutorial, we'll explore how to integrate Horovod with TensorFlow to accelerate your model training through distributed computing.

Prerequisites

Before we dive in, make sure you have:

  • TensorFlow 2.x installed
  • Basic knowledge of TensorFlow and Keras
  • Access to multiple GPUs (for actual distributed training)
  • Horovod installed (we'll cover installation below)

Installing Horovod

Let's start by installing Horovod with TensorFlow support:

bash
pip install horovod[tensorflow]

If you're using GPUs, make sure your system has CUDA and NCCL installed. For a more complete installation with all dependencies, you can use:

bash
pip install horovod[tensorflow,keras,pytorch,mxnet]

Basic Horovod Integration with TensorFlow

Step 1: Import the necessary libraries

First, we need to import TensorFlow and Horovod:

python
import tensorflow as tf
import horovod.tensorflow as hvd

Step 2: Initialize Horovod

Before any other Horovod operations, we need to initialize it:

python
# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

This code initializes Horovod and pins each process to a single GPU. The hvd.local_rank() function returns the local rank of the process, allowing you to assign each process to a different GPU.

Step 3: Scale the learning rate

When training with multiple GPUs or nodes, the effective batch size increases. To account for this, scale your learning rate based on the number of workers:

python
# Adjust learning rate based on number of GPUs
learning_rate = 0.001 * hvd.size()

Step 4: Create a distributed optimizer

Wrap your optimizer with Horovod's distributed optimizer:

python
# Create optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

# Wrap the optimizer with Horovod's distributed optimizer
optimizer = hvd.DistributedOptimizer(optimizer)

This wrapper performs the necessary gradient averaging across all workers during backpropagation.

Step 5: Modify the model compilation

When compiling your model, use the distributed optimizer:

python
model = tf.keras.Sequential([
tf.keras.layers.Dense(10, activation='relu', input_shape=(20,)),
tf.keras.layers.Dense(10, activation='relu'),
tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(
optimizer=optimizer,
loss='binary_crossentropy',
metrics=['accuracy'],
experimental_run_tf_function=False
)

Step 6: Add callbacks for broadcasting and checkpointing

For synchronizing the initial model state and saving checkpoints properly:

python
callbacks = [
# Broadcast initial variables from rank 0 to all processes
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

# Save checkpoints only on worker 0 to prevent conflicts
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

The BroadcastGlobalVariablesCallback ensures that all workers start with the same model weights by broadcasting the weights from the first worker (rank 0) to all others.

Step 7: Modify the training process

Adjust your training to account for the distributed nature:

python
# Generate dummy training data
import numpy as np
x_train = np.random.random((1000, 20))
y_train = np.random.random((1000, 1))

# Shard the dataset based on worker rank
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shard(hvd.size(), hvd.rank())
train_dataset = train_dataset.batch(32)

model.fit(
train_dataset,
epochs=5,
callbacks=callbacks,
verbose=1 if hvd.rank() == 0 else 0
)

Notice how we:

  1. Shard the dataset so each worker processes only part of it
  2. Only enable verbose output on the first worker to avoid cluttered logs

Complete Example with Keras

Here's a full example using Keras with the MNIST dataset:

python
import tensorflow as tf
import horovod.tensorflow.keras as hvd
import numpy as np

# Initialize Horovod
hvd.init()

# Pin GPU to be used
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Load and prepare the MNIST dataset
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Shard the dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))

train_dataset = train_dataset.shard(hvd.size(), hvd.rank())
test_dataset = test_dataset.shard(hvd.size(), hvd.rank())

train_dataset = train_dataset.shuffle(10000).batch(128)
test_dataset = test_dataset.batch(128)

# Create a simple CNN model
model = tf.keras.models.Sequential([
tf.keras.layers.Reshape((28, 28, 1), input_shape=(28, 28)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])

# Adjust learning rate based on number of GPUs
lr = 0.001 * hvd.size()
opt = tf.keras.optimizers.Adam(learning_rate=lr)

# Wrap optimizer with Horovod's distributed optimizer
opt = hvd.DistributedOptimizer(opt)

model.compile(
optimizer=opt,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

# Callbacks for Horovod
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.MetricAverageCallback(),
]

# Save checkpoints only on worker 0 to prevent conflicts
if hvd.rank() == 0:
callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

# Train the model
model.fit(
train_dataset,
epochs=5,
callbacks=callbacks,
validation_data=test_dataset,
verbose=1 if hvd.rank() == 0 else 0
)

# Evaluate model (only on worker 0)
if hvd.rank() == 0:
eval_result = model.evaluate(test_dataset)
print(f"Test loss: {eval_result[0]}, Test accuracy: {eval_result[1]}")

Running Distributed Training

To run your Horovod-enabled TensorFlow code on multiple GPUs, use the horovodrun command:

bash
# Run on 4 GPUs on the same machine
horovodrun -np 4 python mnist_horovod.py

For multi-node training, you need to specify the hosts:

bash
# Run on 2 machines with 4 GPUs each
horovodrun -np 8 -H server1:4,server2:4 python mnist_horovod.py

Alternatively, you can use mpirun if you have MPI installed:

bash
mpirun -np 4 python mnist_horovod.py

Advanced Horovod Features

Elastic Training

Horovod supports elastic training, allowing workers to join or leave during training:

python
hvd.init()

callbacks = [
# Enable fault tolerance
hvd.tensorflow.callbacks.LearningRateWarmupCallback(
warmup_epochs=5,
verbose=1 if hvd.rank() == 0 else 0
),
# Elastic training requires this callback
hvd.tensorflow.callbacks.BroadcastGlobalVariablesCallback(0),
]

Using Horovod with TensorFlow's Custom Training Loops

For more control over the training process, you can integrate Horovod with custom training loops:

python
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Create model and optimizer
model = tf.keras.Sequential([
tf.keras.layers.Dense(10, activation='relu', input_shape=(20,)),
tf.keras.layers.Dense(1, activation='sigmoid')
])

optimizer = tf.keras.optimizers.Adam(0.001 * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)

# Create dataset
x_train = tf.random.normal((100, 20))
y_train = tf.random.uniform((100, 1))
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shard(hvd.size(), hvd.rank())
dataset = dataset.batch(10)

@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
predictions = model(x, training=True)
loss = tf.keras.losses.binary_crossentropy(y, predictions)
# Horovod will take care of aggregating the gradients
tape = hvd.DistributedGradientTape(tape)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss

# Broadcast initial variables
hvd.broadcast_variables(model.variables, root_rank=0)
hvd.broadcast_variables(optimizer.variables(), root_rank=0)

# Training loop
for epoch in range(3):
for step, (x, y) in enumerate(dataset):
loss = train_step(x, y)
if hvd.rank() == 0 and step % 10 == 0:
print(f"Epoch {epoch}, Step {step}, Loss: {loss.numpy()}")

# Save model only on worker 0
if hvd.rank() == 0:
model.save('horovod_model')

Monitoring and Debugging

When training with Horovod, it's important to monitor performance across all workers. Horovod provides utilities for this:

python
# Print timeline
import horovod.tensorflow as hvd
hvd.init()

# Enable Horovod timeline
hvd.timeline_start_step = 5
hvd.timeline_end_step = 10

This will generate a Chrome trace file that you can use to visualize the execution timeline.

Real-world Application: Image Classification with ResNet50

Let's implement a more complete example with a pre-trained ResNet50 model for image classification:

python
import tensorflow as tf
import horovod.tensorflow.keras as hvd
import os

# Initialize Horovod
hvd.init()

# Set up GPU
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Memory growth needs to be enabled explicitly for TF 2.x
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)

# Load ImageNet data (using a small subset for demonstration)
batch_size = 32
# Replace with your actual data loading code
# For example, using TensorFlow Datasets:
import tensorflow_datasets as tfds
dataset, info = tfds.load('imagenette', with_info=True, as_supervised=True)
train_dataset = dataset['train']

# Preprocessing function
def preprocess(image, label):
image = tf.image.resize(image, (224, 224))
image = tf.cast(image, tf.float32) / 255.0
return image, label

train_dataset = train_dataset.map(preprocess)
train_dataset = train_dataset.shuffle(10000)
train_dataset = train_dataset.shard(hvd.size(), hvd.rank())
train_dataset = train_dataset.batch(batch_size)
train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)

# Load pre-trained ResNet50 model
base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
x = base_model.output
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(1024, activation='relu')(x)
predictions = tf.keras.layers.Dense(10, activation='softmax')(x) # 10 classes for ImageNette
model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

# Freeze the base_model layers
for layer in base_model.layers:
layer.trainable = False

# Scale learning rate according to the number of workers
opt = tf.keras.optimizers.SGD(learning_rate=0.001 * hvd.size(), momentum=0.9)
opt = hvd.DistributedOptimizer(opt)

model.compile(
optimizer=opt,
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)

callbacks = [
# Horovod callbacks
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
hvd.callbacks.MetricAverageCallback(),
hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1 if hvd.rank() == 0 else 0),
]

# Add checkpointing only on the first worker
if hvd.rank() == 0:
checkpoint_dir = './checkpoints'
if not os.path.exists(checkpoint_dir):
os.makedirs(checkpoint_dir)
callbacks.append(
tf.keras.callbacks.ModelCheckpoint(
filepath=os.path.join(checkpoint_dir, 'model_{epoch}.h5'),
save_best_only=True,
monitor='accuracy'
)
)

# Train the model
epochs = 10
history = model.fit(
train_dataset,
epochs=epochs,
callbacks=callbacks,
verbose=1 if hvd.rank() == 0 else 0
)

# Save the final model only on worker 0
if hvd.rank() == 0:
model.save('resnet50_trained_model')

# Plot training history
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.tight_layout()
plt.savefig('training_history.png')
plt.close()

Common Challenges and Solutions

Challenge 1: Slow Performance

If your distributed training is slower than expected:

  1. Check Network Bandwidth: Ensure your network has sufficient bandwidth for gradient communication.
  2. Use NCCL: For multiple GPUs on a single machine, NCCL is typically the fastest communication method.
  3. Optimize Batch Size: Adjust batch size to maximize GPU utilization.

Challenge 2: Memory Errors

If you encounter out-of-memory errors:

  1. Reduce Batch Size: Try a smaller batch size per worker.
  2. Enable Memory Growth: Use tf.config.experimental.set_memory_growth().
  3. Mixed Precision Training: Use TensorFlow's mixed precision to reduce memory usage.
python
# Enable mixed precision
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy('mixed_float16')

Challenge 3: Uneven Load Distribution

If some workers finish much sooner than others:

  1. Avoid Static Sharding: Instead of statically sharding the dataset, use dynamic work allocation.
  2. Use Dataset.repeat(): Ensure your dataset repeats indefinitely so faster workers can continue processing.

Summary

In this tutorial, you've learned how to integrate Horovod with TensorFlow for distributed deep learning training across multiple GPUs and nodes. We covered:

  • Basic Horovod setup and initialization
  • Integrating Horovod with Keras models
  • Running custom training loops with Horovod
  • A real-world example with ResNet50
  • Common challenges and solutions

Horovod makes distributed training accessible without requiring major code changes. By following the patterns demonstrated here, you can scale your TensorFlow models efficiently to train on multiple GPUs and nodes.

Additional Resources

Exercises

  1. Modify the MNIST example to use mixed precision training to improve performance.
  2. Implement a learning rate scheduler that adjusts the learning rate based on the number of epochs.
  3. Create a distributed training script for a transformer model on a text classification task.
  4. Compare training speed with different communication backends (NCCL, MPI, Gloo) if available.
  5. Extend the ResNet50 example to include validation during training and early stopping.

By mastering Horovod with TensorFlow, you'll be able to train larger models on bigger datasets more efficiently, opening up opportunities for more ambitious deep learning projects.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)