If you’re new to TensorFlow or deep learning, we recommend that you first work through the single-node TensorFlow guide.
In this guide we explore Yahoo’s TensorFlowOnSpark framework for distributed deep learning on Spark clusters, describing how it works and highlighting typical usage patterns. We then walk through a set of notebooks (referenced throughout the guide) to demonstrate how to train a neural network on MNIST in a distributed setting with TensorFlowOnSpark.
We’ve packaged the example notebooks for this guide into a downloadable DBC archive so you can easily import them into Databricks and run them yourself.
TensorFlowOnSpark, developed by Yahoo, is an open-source Python framework for launching TensorFlow-based distributed deep learning workflows on Spark clusters (see the original Yahoo blog post).
TensorFlowOnSpark simplifies deep learning training on Spark clusters by:
- Abstracting away the need to manually specify a cluster configuration (mapping from CPUs and GPUs to TensorFlow tasks)
- Providing APIs for feeding data from Spark RDDs to TensorFlow programs
- Leveraging Spark’s built-in fault-tolerance (to recover from failures during TF training)
TensorFlowOnSpark launches distributed model training as a single Spark job.
The entry point to model training is
TFCluster.run (see its API documentation),
which runs a user-specified Python function in each Spark executor, passing a context argument
ctx. The context argument contains global and executor-specific information on the current
distributed training run. For example,
ctx.job_name indicates whether the current Spark executor
should act as a TensorFlow worker or parameter server, while
ctx.cluster_spec contains a global
mapping from TensorFlow job name (
"worker") to CPU/GPU devices. The user-specified
function is responsible for inspecting the context argument and running the appropriate logic (for example,
running parameter server or worker code).
TensorFlowOnSpark workflows typically consist of three parts: training, evaluation against held-out samples, and monitoring:
- Runs on the Spark workers.
- Copies a distinct subset of the training data from S3 to each worker’s local hard drive.
- Fits model against local data.
- Stores model checkpoints in DBFS.
- Stores event files (containing information on training loss) persistently in DBFS.
- Periodically syncs event files to the driver to be consumed by TensorBoard, which runs on the driver.
- Runs on the Spark driver during training.
- Stores event files (containing accuracy information on an evaluation set) on the driver to be read by TensorBoard, periodically syncing them to DBFS for persistent storage.
- Runs on the Spark driver to read and display event files that contain metrics about the training run. Using TensorBoard in a distributed environment requires some additional setup, described in the example notebooks below.
- See the single-node TensorFlow guide for an overview of TensorBoard.
We recommend training against data downloaded from S3 to the local disks of the workers. We’ve found this approach to be the most robust, although it limits the size of the training dataset to the total disk space available across all workers. If needed, you can increase the disk space available on your cluster by attaching EBS volumes (link). The notebooks below also include a utility function to download data from S3 to the worker disk.
The ensuing pages of this guide walk through a series of notebooks demonstrating how to train MNIST in a distributed setting using TensorFlowOnSpark.
Helper Notebooks (shared by model training & evaluation)
Demonstrates how to download data from S3 and create a data ingest pipeline (load training data from disk into in-memory tensors) using tf.data APIs in TensorFlow. Our sample data is already stored in a public S3 bucket and has been split into training & validation sets.
Demonstrates how to construct a TensorFlow graph for distributed model training; uses distributed TensorFlow primitives, but does not contain any TensorFlowOnSpark code.
Contains constants used for model training and evaluation (specifies hyperparameters, location of training & test data on S3, etc.)
To launch model training, you need only run this notebook. It uses the helpers defined in the preceding notebooks to build the model graph, then calls TensorFlowOnSpark APIs to launch distributed model training on the Spark workers.
This notebook runs solely on the Spark driver and should be run concurrently with the Launching Model Training notebook. It downloads a validation dataset from S3, periodically loads the partially-trained model from checkpoint files stored on the driver, computes model accuracy on the validation dataset, and writes summary information to a local event file to be consumed by TensorBoard.
This notebook runs TensorBoard on the driver, consuming event files from a directory on the driver’s local filesystem.
The example notebooks are available as a DBC archive.
They are designed to run on clusters with multiple worker machines, and therefore will not run on Community Edition. We recommend using at least two Spark workers with the following libraries:
They are designed to run on clusters with multiple worker machines. We recommend using at least two Spark workers with the following libraries:
tensorflow-gpu==1.4if running on a GPU cluster)
tensorflow-on-sparkDownload egg or build the egg from source, and attach to your cluster as a library.
If you use a CPU cluster, configure Spark to use a single executor per machine (this is the default setting on GPU-enabled clusters). You can do this by setting
1 on the cluster creation page. See Spark Config for more info.
To build a TensorFlowOnSpark egg for use on Databricks, clone the project
python setup.py bdist_egg from the root directory. The example above has been tested
against an egg built from TensorFlowOnSpark commit
- My TensorFlowOnSpark program is hanging while “waiting for x reservations”
This can happen if your cluster has fewer than the number of executors passed to
TFCluster.run, in which case you can simply specify an appropriate number of executors. However, you might also encounter this issue on GPU clusters if your program encounters an error while running and you then attempt to rerun it. In this case, we recommend restarting your cluster.
The issue is caused by TensorFlowOnSpark running parameter server logic in a subprocess of a PySpark worker process. The parameter server subprocess acquires a GPU and then blocks. Since it’s a subprocess, the parameter server doesn’t die when its parent process is killed on task failure. Instead, it continues to hold a GPU, blocking future attempts to reserve a GPU.
- How do I view logs generated during model training?
- Model training runs on the Spark workers, and you can find training logs in the
stderrof Spark workers. To view training logs, navigate to
Clusters, click the current cluster, then navigate to
Spark Cluster UI - Master. Then click an individual worker and view its
stderrto see training logs.
- Where are model checkpoints and event files stored in the example workflow?
Here’s a summary of what gets stored where:
Training data: Stored on S3, copied to a local directory on each worker
Model checkpoints: Stored on DBFS
Event files: Stored on DBFS (generated locally by the driver and chief worker, then synced to DBFS).
Event files are also periodically synced to a local directory on the driver, to be consumed by TensorBoard.
In our example, we define the destination directories of model checkpoints and event files in a single notebook. See Model Training & Evaluation Constants.
- Can I run TensorFlowOnSpark on CPU-only clusters?
- Yes, although it is likely more cost-effective to run your distributed training code on GPU clusters.