Distributed Hyperopt and automated MLflow tracking

Databricks Runtime for Machine Learning includes Hyperopt, augmented with an implementation powered by Apache Spark. By using the SparkTrials extension of hyperopt.Trials, you can easily distribute a Hyperopt run without making other changes to your Hyperopt usage. When applying the hyperopt.fmin() function, you pass in the SparkTrials class. SparkTrials can accelerate single-machine tuning by distributing trials to Spark workers.

MLflow is an open source platform for managing the end-to-end machine learning lifecycle. The level of support for MLflow depends on the Databricks Runtime version:

  • Databricks Runtime 5.5 LTS ML
    • Support automated MLflow tracking for hyperparameter tuning with Hyperopt and SparkTrials in Python. When automated MLflow tracking is enabled and you run fmin() with SparkTrials, hyperparameters and evaluation metrics are automatically logged in MLflow. Without automated MLflow tracking, you must make explicit API calls to log to MLflow. Automated MLflow tracking is enabled by default. To disable it, set the Spark configuration spark.databricks.mlflow.trackHyperopt.enabled to false. You can still use SparkTrials to distribute tuning even without automated MLflow tracking.
    • Include MLflow, so you do not need to install it separately.
  • Databricks Runtime 6.3 ML (Unsupported) and above have support for logging to MLflow from workers. You can add custom logging code in the objective function you pass to Hyperopt.

How to use Hyperopt with SparkTrials

This section describes how to configure the arguments you pass to Hyperopt, best practices in using Hyperopt, and troubleshooting issues that may arise when using Hyperopt.

Note

For models created with MLlib, do not use SparkTrials, as the model building process is automatically parallelized on the cluster. Instead, use the Hyperopt class Trials.

fmin() arguments

The fmin() documentation has detailed explanations for all the arguments. Here are the important ones:

  • fn: The objective function to be called with a value generated from the hyperparameter space (space). fn can return the loss as a scalar value or in a dictionary (refer to Hyperopt docs for details). This is usually where most your code would be, for example, loss calculation, model training, and so on.
  • space: An expression that generates the hyperparameter space Hyperopt searches. A simple example is hp.uniform('x', -10, 10), which defines a single-dimension search space between -10 and 10. Hyperopt provides great flexibility in defining the hyperparameter space. After you are familiar with Hyperopt you can use this argument to make your tuning more efficient.
  • algo: The search algorithm Hyperopt uses to search the hyperparameter space (space). Typical values are hyperopt.rand.suggest for Random Search and hyperopt.tpe.suggest for TPE.
  • max_evals: The number of hyperparameter settings to try, that is, the number of models to fit. This number should be large enough to amortize overhead.
  • max_queue_len: The number of hyperparameter settings Hyperopt should generate ahead of time. Since the Hyperopt TPE generation algorithm can take some time, it can be helpful to increase this beyond the default value of 1, but generally no larger than the SparkTrials setting parallelism.

SparkTrials arguments and implementation

This section describes how to configure the arguments you pass to SparkTrials and implementation aspects of SparkTrials.

Arguments

  • parallelism: The maximum number of trials to evaluate concurrently. Greater parallelism allows scale-out testing of more hyperparameter settings.
    • Trade-offs: Since Hyperopt proposes new trials based on past results, there is a trade-off between parallelism and adaptivity. For a fixed max_evals, which is the max number of trials evaluated by fmin(), greater parallelism will lead to speedups, but lower parallelism may lead to better results since each iteration will have access to more past results.
    • Limits: There is currently a hard cap on parallelism of 128. SparkTrials will also check the cluster’s configuration to see how many concurrent tasks Spark will allow; if parallelism exceeds this maximum, SparkTrials will reduce parallelism to this maximum.
  • timeout: The maximum number of seconds an fmin() call can take. Once this number is exceeded, all runs are terminated and fmin() would then exit. All information about completed runs is preserved based on which best model is selected. This argument can save you time as well as help you control your cluster cost.

The complete SparkTrials API is included in the example notebook. To find it, search for help(SparkTrials).

Implementation

When defining the objective function fn passed to fmin(), and when selecting a cluster setup, it is helpful to understand how SparkTrials distributes tuning tasks.

In Hyperopt, a trial generally corresponds to fitting one model on one setting of hyperparameters. Hyperopt iteratively generates trials, evaluates them, and repeats.

With SparkTrials, the driver node of your cluster generates new trials, and worker nodes evaluate those trials. Each trial is generated with a Spark job which has one task, and is evaluated in the task on a worker machine. If your cluster is set up to run multiple tasks per worker, then multiple trials may be evaluated at once on that worker.

Manage MLflow runs with SparkTrials

SparkTrials logs tuning results as nested MLflow runs as follows:

  • Main or parent run: The call to fmin() is logged as the “main” run. If there is an active run, SparkTrials logs under this active run and does not end the run when fmin() returns. If there is no active run, SparkTrials creates a new run, logs under it, and ends the run before fmin() returns.
  • Child runs: Each hyperparameter setting tested (a “trial”) is logged as a child run under the main run. For clusters running on Databricks Runtime 6.0 ML and above, the MLflow log records from workers are also stored under the corresponding child runs.

When calling fmin(), we recommend active MLflow run management; that is, wrap the call to fmin() inside a with mlflow.start_run(): statement. This ensures that each fmin() call is logged under its own MLflow “main” run, and it makes it easier to log extra tags, params, or metrics to that run.

Note

When fmin() is called multiple times within the same active MLflow run, it logs those multiple fmin() calls to that same “main” run. To resolve name conflicts for MLflow params and tags, names with conflicts are mangled by appending a UUID.

When logging from workers in Databricks Runtime 6.3 ML (Unsupported) and above, you do not need to manage runs explicitly in the objective function. Call mlflow.log_param("param_from_worker", x) in the objective function to log a param in to the child run. You can log params, metrics, tags and artifacts in the objective function.

The notebook shows distributed Hyperopt + automated MLflow tracking in action.

Distributed Hyperopt + automated MLflow tracking notebook

Open notebook in new tab

After you perform the actions in the last cell in the notebook, your MLflow UI should display:

Hyperopt MLflow demo

See Model search using distributed Hyperopt and automated MLflow tracking for a demonstration of how to tune the hyperparameters for multiple models and arrive at a best model overall.