HorovodRunner: distributed deep learning with Horovod

HorovodRunner is a general API to run distributed deep learning workloads on Databricks using Uber’s Horovod framework. By integrating Horovod with Spark’s barrier mode, Databricks is able to provide higher stability for long-running deep learning training jobs on Spark. HorovodRunner takes a Python method that contains DL training code with Horovod hooks. This method gets pickled on the driver and sent to Spark workers. A Horovod MPI job is embedded as a Spark job using barrier execution mode. The first executor collects the IP addresses of all task executors using BarrierTaskContext and triggers a Horovod job using mpirun. Each Python MPI process loads the pickled program back, deserializes it, and runs it.


Distributed training with HorovodRunner

HorovodRunner provides the ability to launch Horovod training jobs as Spark jobs. The HorovodRunner API supports the following methods:

init(self, np)

Create an instance of HorovodRunner.

run(self, main, **kwargs)

Run a Horovod training job invoking main(**kwargs). Both the main function and the keyword arguments are serialized using cloudpickle and distributed to cluster workers.

For details, see the HorovodRunner API documentation.

The general approach to developing a distributed training program using HorovodRunner is:

  1. Create a HorovodRunner instance initialized with the number of nodes.
  2. Define a Horovod training method according to the methods described in Horovod usage, making sure to add any import statements inside the method.
  3. Pass the training method to the HorovodRunner instance.

For example:

hr = HorovodRunner(np=2)

def train():
  import tensorflow as tf


To run HorovodRunner on the driver only with n subprocesses, use hr = HorovodRunner(np=-n). For example, if there are 4 GPUs on the driver node, you can choose n up to 4. You can find details about the parameter np in the HorovodRunner API documentation. You can find details about how to pin one GPU per subprocess in the Horovod usage guide.

A common error is that TensorFlow objects cannot be found or pickled. This happens when the library import statements are not being distributed to other executors. To avoid this issue, ensure that all import statements (for example, import tensorflow as tf) are added at both the top of the Horovod training method and inside any other user defined functions referenced in the Horovod training method.

Record Horovod training with Horovod Timeline

Horovod has the ability to record the timeline of its activity, called Horovod Timeline.


Horovod Timeline has a significant impact on performance. Inception3 throughput can decrease by ~40% when Horovod Timeline is enabled. If you want to speed up your HorovodRunner jobs, disable Horovod Timeline.

To record a Horovod Timeline, set the HOROVOD_TIMELINE environment variable to the location of the timeline file to be created. The location should be on shared storage, e.g., using DBFS local file APIs, so that the timeline file can be easily retrieved. For example:

timeline_dir = "/dbfs/ml/horovod-timeline/%s" % uuid.uuid4()
os.environ['HOROVOD_TIMELINE'] = timeline_dir + "/horovod_timeline.json"
hr = HorovodRunner(np=4)
hr.run(run_training_horovod, params=params)

You can download the timeline file using Databricks CLI or FileStore, and then use the Chrome browser’s chrome://tracing facility to view it. For example:

Horovod timeline

Development workflow

To migrate a single node DL to distributed training, the following are the broad steps in the workflow.

  1. Prepare single node DL code: Prepare and test the single node DL code with TensorFlow, Keras, or PyTorch.
  2. Migrate to Horovod: Follow the instructions from Horovod usage to migrate the code with Horovod and test it on the driver:
    1. Add hvd.init() to initialize Horovod.
    2. Pin a server GPU to be used by this process using config.gpu_options.visible_device_list. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
    3. Include a shard of the dataset. This dataset operator is very useful when running distributed training, as it allows each worker to read a unique subset.
    4. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
    5. Wrap the optimizer in hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
    6. Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using MonitoredTrainingSession, you can execute the hvd.broadcast_global_variables operation after global variables have been initialized.
    7. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.
  3. Migrate to HorovodRunner: HorovodRunner runs the Horovod training job by invoking a Python function. You must wrap the main training procedure into one function. Then you can test HorovodRunner in local mode and distributed mode.

Update the deep learning libraries

If you upgrade or downgrade TensorFlow, Keras, or PyTorch, you must reinstall Horovod so that it is compiled against the newly installed library. For example, if you want to upgrade TensorFlow, Databricks recommends using the init script from the TensorFlow installation instructions and appending the following TensorFlow specific Horovod installation code to the end of it. See Horovod installation instructions to work with different combinations, such as upgrading or downgrading PyTorch and other libraries.

add-apt-repository -y ppa:ubuntu-toolchain-r/test
apt update
# Using the same compiler that TensorFlow was built to compile Horovod
apt install g++-7 -y
update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-7 60 --slave /usr/bin/g++ g++ /usr/bin/g++-7

HOROVOD_GPU_ALLREDUCE=NCCL HOROVOD_CUDA_HOME=/usr/local/cuda pip install horovod==0.18.1 --force-reinstall --no-deps --no-cache-dir


The following examples, based on the MNIST dataset, demonstrate how to migrate a single-node deep learning program to distributed deep learning with HorovodRunner.