from tensorflow.keras.datasets.mnist import load_data as mnist_load_data
from tensorflow.keras.utils import to_categorical as keras_to_categorical
def get_dataset(num_classes, rank=0, size=1):
(x_train, y_train), (x_test, y_test) = mnist_load_data('MNIST-data-%d' % rank)
x_train = x_train[rank::size]
y_train = y_train[rank::size]
x_test = x_test[rank::size]
y_test = y_test[rank::size]
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
y_train = keras_to_categorical(y_train, num_classes)
y_test = keras_to_categorical(y_test, num_classes)
return (x_train, y_train), (x_test, y_test)
/databricks/python/lib/python3.7/site-packages/tensorflow/python/pywrap_tensorflow_internal.py:15: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
import imp
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Dropout, Flatten, Dense, Dropout
def get_model(num_classes):
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=(28, 28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
return model
# Specify training parameters
epochs = 2
num_classes = 10
def train_hvd(learning_rate, batch_size, checkpoint_dir):
"""
This function is passed to Horovod and executed on each worker.
Pass in the hyperparameters we will tune with Hyperopt.
"""
# Import tensorflow modules to each worker
from tensorflow.keras import backend as K
from tensorflow.keras.models import Sequential
import tensorflow as tf
from tensorflow import keras
import horovod.tensorflow.keras as hvd
import shutil
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
# These steps are skipped on a CPU cluster
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
(x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size())
model = get_model(num_classes)
# Adjust learning rate based on number of GPUs
optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size())
# Use the Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# Create a callback to broadcast the initial variable states from rank 0 to all other processes.
# This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
callbacks = [
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Save checkpoints only on worker 0 to prevent conflicts between workers
if hvd.rank() == 0:
param_str = 'learning_rate_{lr}_batch_size_{bs}'.format(lr=learning_rate, bs=batch_size)
checkpoint_dir_for_this_trial = os.path.join(checkpoint_dir, param_str)
local_ckpt_path = os.path.join(checkpoint_dir_for_this_trial, 'checkpoint-{epoch}.ckpt')
callbacks.append(keras.callbacks.ModelCheckpoint(local_ckpt_path, save_weights_only = True))
model.fit(x_train, y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=2,
validation_data=(x_test, y_test))
return model.evaluate(x_test, y_test)
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from sparkdl import HorovodRunner
def train(params):
"""
An example train method that calls into HorovodRunner.
This method is passed to hyperopt.fmin().
:param params: hyperparameters. Its structure is consistent with how search space is defined. See below.
:return: dict with fields 'loss' (scalar loss) and 'status' (success/failure status of run)
"""
hr = HorovodRunner(np=2)
loss, acc = hr.run(train_hvd,
learning_rate=params['learning_rate'],
batch_size=params['batch_size'],
checkpoint_dir=checkpoint_dir)
return {'loss': loss, 'status': STATUS_OK}
Distributed training with Hyperopt and HorovodRunner
Databricks Runtime for Machine Learning includes Hyperopt, a library for ML hyperparameter tuning in Python, and HorovodRunner, a general API to run distributed deep learning workloads on Databricks using the Horovod framework.
Use case Distributed deep learning workloads in Python for which you want to tune hyperparameters.
In this example notebook
The demo is from Hyperopt's documentation and HorovodRunner documentation with minor adjustments.
This guide consists of the following sections:
Requirements