HorovodRunner: Distributed Deep Learning with Horovod

HorovodRunner is a general API to run distributed deep learning workloads on Databricks using Uber’s Horovod framework. By integrating Horovod with Spark’s barrier mode, Databricks is able to provide higher stability for long-running deep learning training jobs on Spark. HorovodRunner takes a Python method that contains DL training code with Horovod hooks. This method gets pickled on the driver and sent to Spark workers. A Horovod MPI job is embedded as a Spark job using barrier execution mode. The first executor collects the IP addresses of all task executors using BarrierTaskContext and triggers a Horovod job using mpirun. Each Python MPI process loads the pickled program back, deserializes it, and runs it.

../../../_images/horovod-runner.png

Requirements

HorovodRunner requires Databricks Runtime 5.0 ML (Beta) or above.

Distributed training with HorovodRunner

HorovodRunner provides the ability to launch Horovod training jobs as Spark jobs. The HorovodRunner API supports the following methods:

init(self, np)
Create an instance of HorovodRunner.
run(self, main, **kwargs)
Run a Horovod training job invoking main(**kwargs). Both the main function and the keyword arguments are serialized using cloudpickle and distributed to cluster workers.

For details, see the HorovodRunner API documentation.

The general approach to developing a distributed training program using HorovodRunner is:

  1. Create a HorovodRunner instance initialized with the number of nodes.
  2. Define a Horovod training method according to the methods described in Horovod usage.
  3. Pass the training method to the HorovodRunner instance.

For example:

hr = HorovodRunner(np=2)

def train():
  hvd.init()

hr.run(train)

Record Horovod training with Horovod Timeline

Horovod has the ability to record the timeline of its activity, called Horovod Timeline.

Important

Horovod Timeline has a significant impact on performance. Inception3 throughput can decrease by ~40% when Horovod Timeline is enabled. If you want to speed up your HorovodRunner jobs, disable Horovod Timeline.

To record a Horovod Timeline, set the HOROVOD_TIMELINE environment variable to the location of the timeline file to be created. The timeline_path must be on a FUSE mount of blob storage.

os.environ['HOROVOD_TIMELINE'] = params['timeline_path']
hr = HorovodRunner(np=4)
hr.run(run_training_horovod, params=params)

You can open the timeline file using the chrome://tracing facility of the Chrome browser. For example:

../../../_images/MNIST-timeline.png

Development workflow

To migrate a single node DL to distributed training, the following are the broad steps in the workflow.

  1. Prepare single node DL code: Prepare and test the single node DL code with TensorFlow, Keras, or PyTorch.
  2. Migrate to Horovod: Follow the instructions from Horovod usage to migrate the code with Horovod and test it on the driver:
    1. Add hvd.init() to initialize Horovod.
    2. Pin a server GPU to be used by this process using config.gpu_options.visible_device_list. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
    3. Include a shard of the dataset. This dataset operator is very useful when running distributed training, as it allows each worker to read a unique subset.
    4. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
    5. Wrap the optimizer in hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
    6. Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using MonitoredTrainingSession, you can execute the hvd.broadcast_global_variables operation after global variables have been initialized.
    7. Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.
  3. Migrate to HorovodRunner: HorovodRunner runs the Horovod training job by invoking a Python function. You must wrap the main training procedure into one function. Then you can test HorovodRunner in local mode and distributed mode.

Examples

The examples in this section demonstrate how to use HorovodRunner to perform distributed training using a convolutional neural network model on the MNIST dataset, a large database of handwritten digits, shown below.

../../../_images/mnist.png

Training a model to predict a digit is commonly used as the “Hello World” of machine learning.

Each example below demonstrates how to migrate a single machine deep learning program using popular deep learning libraries to distributed deep learning with HorovodRunner.