HorovodRunner is a general API to run distributed deep learning workloads on Databricks using Uber’s Horovod framework. By integrating Horovod with Spark’s barrier mode, Databricks is able to provide higher stability for long-running deep learning training jobs on Spark.
HorovodRunner takes a Python method that contains DL training code with Horovod hooks. This method gets pickled on the driver and sent to Spark workers. A Horovod MPI job is embedded as a Spark job using barrier execution mode. The first executor collects the IP addresses of all task executors using
BarrierTaskContext and triggers a Horovod job using
mpirun. Each Python MPI process loads the pickled program back, deserializes it, and runs it.
HorovodRunner provides the ability to launch Horovod training jobs as Spark jobs. The
HorovodRunner API supports the following methods:
Create an instance of HorovodRunner.
run(self, main, **kwargs)
Run a Horovod training job invoking
main(**kwargs). Both the main function and the keyword arguments are serialized using cloudpickle and distributed to cluster workers.
For details, see the HorovodRunner API documentation.
The general approach to developing a distributed training program using
- Create a
HorovodRunnerinstance initialized with the number of nodes.
- Define a Horovod training method according to the methods described in Horovod usage, making sure to add any import statements inside the method.
- Pass the training method to the
hr = HorovodRunner(np=2) def train(): import tensorflow as tf hvd.init() hr.run(train)
HorovodRunner on the driver only with
n subprocesses, use
hr = HorovodRunner(np=-n). For example, if there are 4 GPUs on the driver node, you can choose
n up to
4. You can find details about the parameter
np in the HorovodRunner API documentation. You can find details about how to pin one GPU per subprocess in the Horovod usage guide.
A common error is that TensorFlow objects cannot be found or pickled. This happens when the library import statements are not being distributed to other executors. To avoid this issue, ensure that all import statements (for example,
import tensorflow as tf) are added at both the top of the Horovod training method and inside any other user defined functions referenced in the Horovod training method.
Horovod has the ability to record the timeline of its activity, called Horovod Timeline.
Horovod Timeline has a significant impact on performance. Inception3 throughput can decrease by ~40% when Horovod Timeline is enabled. If you want to speed up your HorovodRunner jobs, disable Horovod Timeline.
To record a Horovod Timeline, set the
HOROVOD_TIMELINE environment variable to the location of the timeline file to be created. The location should be on shared storage, e.g., using DBFS local file APIs, so that the timeline file can be easily retrieved. For example:
timeline_dir = "/dbfs/ml/horovod-timeline/%s" % uuid.uuid4() os.makedirs(timeline_dir) os.environ['HOROVOD_TIMELINE'] = timeline_dir + "/horovod_timeline.json" hr = HorovodRunner(np=4) hr.run(run_training_horovod, params=params)
To migrate a single node DL to distributed training, the following are the broad steps in the workflow.
- Prepare single node DL code: Prepare and test the single node DL code with TensorFlow, Keras, or PyTorch.
- Migrate to Horovod: Follow the instructions from Horovod usage to migrate the code with Horovod and test it on the driver:
hvd.init()to initialize Horovod.
- Pin a server GPU to be used by this process using
config.gpu_options.visible_device_list. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
- Include a shard of the dataset. This dataset operator is very useful when running distributed training, as it allows each worker to read a unique subset.
- Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
- Wrap the optimizer in
hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
hvd.BroadcastGlobalVariablesHook(0)to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using
MonitoredTrainingSession, you can execute the
hvd.broadcast_global_variablesoperation after global variables have been initialized.
- Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing
tf.train.MonitoredTrainingSession if hvd.rank() != 0.
- Migrate to HorovodRunner:
HorovodRunnerruns the Horovod training job by invoking a Python function. You must wrap the main training procedure into one function. Then you can test
HorovodRunnerin local mode and distributed mode.
If you upgrade or downgrade TensorFlow, Keras, or PyTorch, you must reinstall Horovod so that it is compiled against the newly installed library. For example, if you want to upgrade TensorFlow, Databricks recommends using the init script from the TensorFlow installation instructions and appending the following TensorFlow specific Horovod installation code to the end of it. See Horovod installation instructions to work with different combinations, such as upgrading or downgrading PyTorch and other libraries.
add-apt-repository -y ppa:ubuntu-toolchain-r/test apt update # Using the same compiler that TensorFlow was built to compile Horovod apt install g++-7 -y update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-7 60 --slave /usr/bin/g++ g++ /usr/bin/g++-7 HOROVOD_GPU_ALLREDUCE=NCCL HOROVOD_CUDA_HOME=/usr/local/cuda pip install horovod==0.18.1 --force-reinstall --no-deps --no-cache-dir
The following examples, based on the MNIST dataset, demonstrate how to migrate a single-node deep learning program to distributed deep learning with