hyperopt-distributed-ml-training(Python)

Loading...

Distributed training with Hyperopt and HorovodRunner

Databricks Runtime for Machine Learning includes Hyperopt, a library for ML hyperparameter tuning in Python, and HorovodRunner, a general API to run distributed deep learning workloads on Databricks using the Horovod framework.

Use case Distributed deep learning workloads in Python for which you want to tune hyperparameters.

In this example notebook

The demo is from Hyperopt's documentation and HorovodRunner documentation with minor adjustments.

This guide consists of the following sections:

  • Run distributed training using HorovodRunner
  • Use Hyperopt to tune hyperparameters in the distributed training workflow

Requirements

  • This notebook runs on CPU or GPU clusters.
  • To run the notebook, create a cluster with
  • Two workers
  • Databricks Runtime 6.3 ML or above

Run distributed training using HorovodRunner

This section shows a simple example of distributed training using HorovodRunner. For more details, including information on migrating from single-node to distributed training, see the Databricks HorovodRunner documentation (AWS|Azure|GCP).

Set up checkpoint location

Databricks recommends saving training data under dbfs:/ml, which maps to file:/dbfs/ml on driver and worker nodes. dbfs:/ml is a special folder that provides high-performance I/O for deep learning workloads.

import os
import time
 
checkpoint_dir = '/dbfs/ml/MNISTDemo/train/{}'.format(time.time())

Create function to prepare data

This following cell creates a function that prepares the data for training. This function takes in rank and size arguments so it can be used for both single-node and distributed training. In Horovod, rank is a unique process ID and size is the total number of processes.

This function downloads the data from keras.datasets, distributes the data across the available nodes, and converts the data to shapes and types needed for training.

from tensorflow.keras.datasets.mnist import load_data as mnist_load_data
from tensorflow.keras.utils import to_categorical as keras_to_categorical
 
def get_dataset(num_classes, rank=0, size=1):
  (x_train, y_train), (x_test, y_test) = mnist_load_data('MNIST-data-%d' % rank)
  x_train = x_train[rank::size]
  y_train = y_train[rank::size]
  x_test = x_test[rank::size]
  y_test = y_test[rank::size]
  x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
  x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
  x_train = x_train.astype('float32')
  x_test = x_test.astype('float32')
  x_train /= 255
  x_test /= 255
  y_train = keras_to_categorical(y_train, num_classes)
  y_test = keras_to_categorical(y_test, num_classes)
  return (x_train, y_train), (x_test, y_test)
/databricks/python/lib/python3.7/site-packages/tensorflow/python/pywrap_tensorflow_internal.py:15: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses import imp

Create function to train model

The following cell defines the model using the tensorflow.keras API. This code is adapted from the Keras MNIST convnet example. The model consists of 2 convolutional layers, a max-pooling layer, two dropout layers, and a final dense layer.

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Dropout
 
def get_model(num_classes):
  model = Sequential()
  model.add(Conv2D(32, kernel_size=(3, 3),
            activation='relu',
            input_shape=(28, 28, 1)))
  model.add(Conv2D(64, (3, 3), activation='relu'))
  model.add(MaxPooling2D(pool_size=(2, 2)))
  model.add(Dropout(0.25))
  model.add(Flatten())
  model.add(Dense(128, activation='relu'))
  model.add(Dropout(0.5))
  model.add(Dense(num_classes, activation='softmax'))
  return model

Create the training function for HorovodRunner

The following cell creates the training function. This function:

  • takes in the hyperparameters for tuning
  • configures Horovod
  • takes in the dataset
  • takes in the model definition
  • adds an optimizer
  • compiles the model
  • fits the model
  • returns the loss on the validation set
# Specify training parameters
epochs = 2
num_classes = 10
 
def train_hvd(learning_rate, batch_size, checkpoint_dir):
  """
  This function is passed to Horovod and executed on each worker.
  Pass in the hyperparameters we will tune with Hyperopt.
  """
  # Import tensorflow modules to each worker
  from tensorflow.keras import backend as K
  from tensorflow.keras.models import Sequential
  import tensorflow as tf
  from tensorflow import keras
  import horovod.tensorflow.keras as hvd
  import shutil
  
  # Initialize Horovod
  hvd.init()
 
  # Pin GPU to be used to process local rank (one GPU per process)
  # These steps are skipped on a CPU cluster
  gpus = tf.config.experimental.list_physical_devices('GPU')
  for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
  if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    
  (x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
  model = get_model(num_classes)
 
  # Adjust learning rate based on number of GPUs
  optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
 
  # Use the Horovod Distributed Optimizer
  optimizer = hvd.DistributedOptimizer(optimizer)
 
  model.compile(optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy'])
 
  # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
  # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
  callbacks = [
      hvd.callbacks.BroadcastGlobalVariablesCallback(0),
  ]
 
  # Save checkpoints only on worker 0 to prevent conflicts between workers
  if hvd.rank() == 0:
      param_str = 'learning_rate_{lr}_batch_size_{bs}'.format(lr=learning_rate, bs=batch_size)
      checkpoint_dir_for_this_trial = os.path.join(checkpoint_dir, param_str)
      local_ckpt_path = os.path.join(checkpoint_dir_for_this_trial, 'checkpoint-{epoch}.ckpt')
      callbacks.append(keras.callbacks.ModelCheckpoint(local_ckpt_path, save_weights_only = True))
  
  model.fit(x_train, y_train,
            batch_size=batch_size,
            callbacks=callbacks,
            epochs=epochs,
            verbose=2,
            validation_data=(x_test, y_test))
  return model.evaluate(x_test, y_test)

Use Hyperopt to tune hyperparameters

In this section, you create the Hyperopt workflow.

  • Define a function to minimize
  • Define a search space over hyperparameters
  • Specify the search algorithm and use fmin() to tune the model

For more information about the Hyperopt APIs, see the Hyperopt docs.

Define a function to minimize

  • Input: hyperparameters
  • Internally: Use the training function defined for HorovodRunner and run distributed training to fit a model and compute its loss. To run this example on a cluster with 2 workers, each with a single GPU, initialize HorovodRunner with np=2:
  • Output: loss
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from sparkdl import HorovodRunner
 
def train(params):
  """
  An example train method that calls into HorovodRunner.
  This method is passed to hyperopt.fmin().
  
  :param params: hyperparameters. Its structure is consistent with how search space is defined. See below.
  :return: dict with fields 'loss' (scalar loss) and 'status' (success/failure status of run)
  """
  hr = HorovodRunner(np=2)
  loss, acc = hr.run(train_hvd,
                     learning_rate=params['learning_rate'],
                     batch_size=params['batch_size'],
                     checkpoint_dir=checkpoint_dir)
  return {'loss': loss, 'status': STATUS_OK}

Define the search space over hyperparameters

This example tunes two hyperparameters: learning rate and batch size. See the Hyperopt docs for details on defining a search space and parameter expressions.

import numpy as np
space = {
  'learning_rate': hp.loguniform('learning_rate', np.log(1e-4), np.log(1e-1)),
  'batch_size': hp.choice('batch_size', [32, 64, 128]),
}