def get_dataset(num_classes, rank=0, size=1): from tensorflow import keras (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data('MNIST-data-%d' % rank) x_train = x_train[rank::size] y_train = y_train[rank::size] x_test = x_test[rank::size] y_test = y_test[rank::size] x_train = x_train.reshape(x_train.shape[0], 28, 28, 1) x_test = x_test.reshape(x_test.shape[0], 28, 28, 1) x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train /= 255 x_test /= 255 y_train = keras.utils.to_categorical(y_train, num_classes) y_test = keras.utils.to_categorical(y_test, num_classes) return (x_train, y_train), (x_test, y_test)
def get_model(num_classes): from tensorflow.keras import models from tensorflow.keras import layers model = models.Sequential() model.add(layers.Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1))) model.add(layers.Conv2D(64, (3, 3), activation='relu')) model.add(layers.MaxPooling2D(pool_size=(2, 2))) model.add(layers.Dropout(0.25)) model.add(layers.Flatten()) model.add(layers.Dense(128, activation='relu')) model.add(layers.Dropout(0.5)) model.add(layers.Dense(num_classes, activation='softmax')) return model
# Specify training parameters batch_size = 128 epochs = 2 num_classes = 10 def train(learning_rate=1.0): import tensorflow as tf from tensorflow import keras # These steps are automatically skipped on a CPU cluster gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) (x_train, y_train), (x_test, y_test) = get_dataset(num_classes) model = get_model(num_classes) # Specify the optimizer (Adadelta in this example), using the learning rate input parameter of the function so that Horovod can adjust the learning rate during training optimizer = keras.optimizers.legacy.Adadelta(lr=learning_rate) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=2, validation_data=(x_test, y_test)) return model
model = train(learning_rate=0.1)
Epoch 1/2
469/469 - 90s - loss: 0.5929 - accuracy: 0.8181 - val_loss: 0.2166 - val_accuracy: 0.9368 - 90s/epoch - 192ms/step
Epoch 2/2
469/469 - 90s - loss: 0.2903 - accuracy: 0.9133 - val_loss: 0.1505 - val_accuracy: 0.9545 - 90s/epoch - 191ms/step
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op, _jit_compiled_convolution_op while saving (showing 2 of 2). These functions will not be directly callable after loading.
INFO:tensorflow:Assets written to: /tmp/tmpv330mlaf/model/data/model/assets
INFO:tensorflow:Assets written to: /tmp/tmpv330mlaf/model/data/model/assets
_, (x_test, y_test) = get_dataset(num_classes) loss, accuracy = model.evaluate(x_test, y_test, batch_size=128) print("loss:", loss) print("accuracy:", accuracy)
79/79 [==============================] - 3s 42ms/step - loss: 0.1505 - accuracy: 0.9545
loss: 0.15045417845249176
accuracy: 0.9545000195503235
def train_hvd(checkpoint_path, learning_rate=1.0): # Import tensorflow modules to each worker from tensorflow.keras import backend as K from tensorflow.keras.models import Sequential import tensorflow as tf from tensorflow import keras import horovod.tensorflow.keras as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) # These steps are automatically skipped on a CPU cluster gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') # Call the get_dataset function you created, this time with the Horovod rank and size (x_train, y_train), (x_test, y_test) = get_dataset(num_classes, hvd.rank(), hvd.size()) model = get_model(num_classes) # Adjust learning rate based on number of GPUs optimizer = keras.optimizers.Adadelta(lr=learning_rate * hvd.size()) # Use the Horovod Distributed Optimizer optimizer = hvd.DistributedOptimizer(optimizer) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) # Create a callback to broadcast the initial variable states from rank 0 to all other processes. # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. callbacks = [ hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] # Save checkpoints only on worker 0 to prevent conflicts between workers if hvd.rank() == 0: callbacks.append(keras.callbacks.ModelCheckpoint(checkpoint_path, save_weights_only = True)) model.fit(x_train, y_train, batch_size=batch_size, callbacks=callbacks, epochs=epochs, verbose=2, validation_data=(x_test, y_test))
from sparkdl import HorovodRunner checkpoint_path = checkpoint_dir + '/checkpoint-{epoch}.ckpt' learning_rate = 0.1 hr = HorovodRunner(np=2) hr.run(train_hvd, checkpoint_path=checkpoint_path, learning_rate=learning_rate)
WARNING:HorovodRunner:HorovodRunner will only stream logs generated by :func:`sparkdl.horovod.log_to_driver` or
:class:`sparkdl.horovod.tensorflow.keras.LogCallback` to notebook cell output. If want to stream all
logs to driver for debugging, you can set driver_log_verbosity to 'all', like `HorovodRunner(np=2,
driver_log_verbosity='all')`.
INFO:HorovodRunner:The global names read or written to by the pickled function are {'get_dataset': None, 'num_classes': None, 'get_model': None, 'batch_size': None, 'epochs': None}.
INFO:HorovodRunner:The pickled object size is 3960 bytes.
INFO:HorovodRunner:
### How to enable Horovod Timeline? ###
HorovodRunner has the ability to record the timeline of its activity with Horovod Timeline. To
record a Horovod Timeline, set the `HOROVOD_TIMELINE` environment variable to the location of the
timeline file to be created. You can then open the timeline file using the chrome://tracing
facility of the Chrome browser.
/databricks/spark/python/pyspark/sql/context.py:117: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
warnings.warn(
INFO:HorovodRunner:Start training.
import tensorflow as tf hvd_model = get_model(num_classes) hvd_model.compile(optimizer=tf.keras.optimizers.Adadelta(lr=learning_rate), loss='categorical_crossentropy', metrics=['accuracy']) hvd_model.load_weights(tf.train.latest_checkpoint(os.path.dirname(checkpoint_path)))
/databricks/python/lib/python3.9/site-packages/keras/optimizers/optimizer_v2/adadelta.py:77: UserWarning: The `lr` argument is deprecated, use `learning_rate` instead.
super(Adadelta, self).__init__(name, **kwargs)
Out[20]: <tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7f27ac4a14c0>
_, (x_test, y_test) = get_dataset(num_classes) loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128) print("loaded model loss and accuracy:", loss, accuracy)
79/79 [==============================] - 4s 42ms/step - loss: 0.1407 - accuracy: 0.9600
loaded model loss and accuracy: 0.14069345593452454 0.9599999785423279
import numpy as np # Use rint() to round the predicted values to the nearest integer preds = np.rint(hvd_model.predict(x_test[0:9])) preds
1/1 [==============================] - 0s 73ms/step
Out[22]: array([[0., 0., 0., 0., 0., 0., 0., 1., 0., 0.],
[0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
[1., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
[0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 1., 0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
[0., 0., 0., 0., 0., 1., 0., 0., 0., 0.]], dtype=float32)
Distributed deep learning training using TensorFlow and Keras with HorovodRunner
This notebook illustrates the use of HorovodRunner for distributed training with the
tensorflow.keras
API. It first shows how to train a model on a single node, and then shows how to adapt the code using HorovodRunner for distributed training. The notebook runs on CPU and GPU clusters.Requirements
Databricks Runtime 7.0 ML or above.
HorovodRunner is designed to improve model training performance on clusters with multiple workers, but multiple workers are not required to run this notebook.