Use Ray on Databricks

With Ray 2.3.0 and above, you can create Ray clusters and run Ray applications on Apache Spark clusters with Databricks. For information about getting started with machine learning on Ray, including tutorials and examples, see the Ray documentation. For more information about the Ray and Apache Spark integration, see the Ray on Spark API documentation.

Requirements

  • Databricks Runtime 12.0 ML and above.

  • Databricks Runtime cluster access mode must be either “Assigned” mode or “No isolation shared” mode.

Install Ray

Use the following command to install Ray. The [default] extension is required by the Ray dashboard component.

%pip install ray[default]>=2.3.0

Create a user-specific Ray cluster in a Databricks cluster

To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.

In any Databricks notebook that is attached to a Databricks cluster, you can run the following command:

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_worker_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

The ray.util.spark.setup_ray_cluster API creates a Ray cluster on Spark. Internally, it creates a background Spark job. Each Spark task in the job creates a Ray worker node, and the Ray head node is created on the driver. The argument num_worker_nodes represents the number of Ray worker nodes to create. To specify the number of CPU or GPU cores assigned to each Ray worker node, set the argument num_cpus_worker_node (default value: 1) or num_gpus_worker_node (default value: 0).

After a Ray cluster is created, you can run any Ray application code directly in your notebook. Click Open Ray Cluster Dashboard in a new tab to view the Ray dashboard for the cluster.

Tip

If you’re using a Databricks single user cluster, you can set num_worker_nodes to ray.util.spark.MAX_NUM_WORKER_NODES to use all available resources for your Ray cluster.

setup_ray_cluster(
  # ...
  num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)

Set the argument collect_log_to_path to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down. Databricks recommends that you set a path starting with /dbfs/ so that the logs are preserved even if you terminate the Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.

Note

“To have your Ray application automatically use the Ray cluster that was created, call ray.util.spark.setup_ray_cluster to set the RAY_ADDRESS environment variable to the address of the Ray cluster.” You can specify an alternative cluster address using the address argument of the ray.init API.

Run a Ray application

After the Ray cluster has been created, you can run any Ray application code in a Databricks notebook.

Important

Databricks recommends that you install any necessary libraries for your application with %pip install <your-library-dependency> to ensure they are available to your Ray cluster and application accordingly. Specifying dependencies in the Ray init function call installs the dependencies in a location inaccessible to the Spark worker nodes, which results in version incompatibilities and import errors.

For example, you can run a simple Ray application in a Databricks notebook as follows:

import ray
import random
import time
from fractions import Fraction

ray.init()

@ray.remote
def pi4_sample(sample_count):
    """pi4_sample runs sample_count experiments, and returns the
    fraction of time it was inside the circle.
    """
    in_count = 0
    for i in range(sample_count):
        x = random.random()
        y = random.random()
        if x*x + y*y <= 1:
            in_count += 1
    return Fraction(in_count, sample_count)

SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')

pi = pi4 * 4
print(float(pi))

Create a Ray cluster in autoscaling mode

In Ray 2.8.0 and above, Ray clusters started on Databricks support integration with Databricks autoscaling. See Databricks cluster autoscaling.

With Ray 2.8.0 and above, you can create a Ray cluster on a Databricks cluster that supports scaling up or down according to workloads. This autoscaling integration triggers Databricks cluster autoscaling internally within the Databricks environment.

To enable autoscaling, run the following command:

from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
  num_worker_nodes=8,
  autoscale=True,
  ... # other arguments
)

If autoscaling is enabled, num_worker_nodes indicates the maximum number of Ray worker nodes. The default minimum number of Ray worker nodes is 0. This default setting means that when the Ray cluster is idle, it scales down to zero Ray worker nodes. This may not be ideal for fast responsiveness in all scenarios, but when enabled, can greatly reduce costs.

In autoscaling mode, num_worker_nodes cannot be set to ray.util.spark.MAX_NUM_WORKER_NODES.

The following arguments configure the upscaling and downscaling speed:

  • autoscale_upscaling_speed represents the number of nodes allowed to be pending as a multiple of the current number of nodes. The higher the value, the more aggressive the upscaling. For example, if this is set to 1.0, the cluster can grow in size by at most 100% at any time.

  • autoscale_idle_timeout_minutes represents the number of minutes that need to pass before an idle worker node is removed by the autoscaler. The smaller the value, the more aggressive the downscaling.

With Ray 2.9.0 and above, you can also set autoscale_min_worker_nodes to prevent the Ray cluster from scaling down to zero workers when the Ray cluster is idle.

Connect to remote Ray cluster using Ray client

In Ray 2.9.3, create a Ray cluster by calling the setup_ray_cluster API. In the same notebook, call the ray.init() API to connect to this Ray cluster.

For a Ray cluster that is not in global mode, get the remote connection string with following code:

To get the remote connection string using the following:

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

Connect to the remote cluster using this remote connection string:

import ray
ray.init(remote_conn_str)

The Ray client does not support the Ray dataset API defined in the ray.data module. As a workaround, you can wrap your code that calls the Ray dataset API inside a remote Ray task, as shown in the following code:

import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())

Load data from a Spark DataFrame

To load a Spark DataFrame as a Ray Dataset, first, you must save the Spark DataFrame to UC volumes or Databricks Filesystem (deprecated) as Parquet format. To control Databricks Filesystem access securely, Databricks recommends that you mount cloud object storage to DBFS. Then, you can create a ray.data.Dataset instance from the saved Spark DataFrame path using the following helper method:

import ray
import os
from urllib.parse import urlparse


def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
    spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
    fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
    return ray.data.read_parquet(fuse_path)

# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")

# Provide a dbfs location to write the table to
data_location_2 = (
    "dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)

# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
    spark_dataframe=spark_df,
    dbfs_tmp_path=data_location_2
)

Load data from a Unity Catalog table through Databricks SQL warehouse

For Ray 2.8.0 and above, you can call the ray.data.read_databricks_tables API to load data from a Databricks Unity Catalog table.

First, you need to set the DATABRICKS_TOKEN environment variable to your Databricks warehouse access token. If you’re not running your program on Databricks Runtime, set the DATABRICKS_HOST environment variable to the Databricks workspace URL, as shown in the following:

export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

Then, call ray.data.read_databricks_tables() to read from the Databricks SQL warehouse.

import ray

ray_dataset = ray.data.read_databricks_tables(
    warehouse_id='...',  # Databricks SQL warehouse ID
    catalog='catalog_1',  # Unity catalog name
    schema='db_1',  # Schema name
    query="SELECT title, score FROM movie WHERE year >= 1980",
)

Configure resources used by Ray head node

By default, for the Ray on Spark configuration, Databricks restricts resources allocated to the Ray head node to:

  • 0 CPU cores

  • 0 GPUs

  • 128 MB heap memory

  • 128 MB object store memory

This is because the Ray head node is usually used for global coordination, not for executing Ray tasks. The Spark driver node resources are shared with multiple users, so the default setting saves resources on the Spark driver side.

With Ray 2.8.0 and above, you can configure resources used by the Ray head node. Use the following arguments in the setup_ray_cluster API:

  • num_cpus_head_node: setting CPU cores used by Ray head node

  • num_gpus_head_node: setting GPU used by Ray head node

  • object_store_memory_head_node: setting object store memory size by Ray head node

Support for heterogeneous clusters

For more efficient and cost effective training runs, you can create a Ray on Spark cluster and set different configurations between the Ray head node and Ray worker nodes. However, all Ray worker nodes must have the same configuration. Databricks clusters do not fully support heterogeneous clusters, but you can create a Databricks cluster with different driver and worker instance types by setting a cluster policy.

For example:

{
  "node_type_id": {
    "type": "fixed",
    "value": "i3.xlarge"
  },
  "driver_node_type_id": {
    "type": "fixed",
    "value": "g4dn.xlarge"
  },
  "spark_version": {
    "type": "fixed",
    "value": "13.x-snapshot-gpu-ml-scala2.12"
  }
}

Tune the Ray cluster configuration

The recommended configuration for each Ray worker node is:

  • Minimum 4 CPU cores per Ray worker node.

  • Minimum 10GB heap memory for each Ray worker node.

When calling ray.util.spark.setup_ray_cluster, Databricks recommends setting num_cpus_worker_node to a value >= 4.

See Memory allocation for Ray worker nodes for details about tuning heap memory for each Ray worker node.

Memory allocation for Ray worker nodes

Each Ray worker node uses two types of memory: heap memory and object store memory. The allocated memory size for each type is determined as described below.

The total memory allocated to each Ray worker node is:

RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES is the maximum number of Ray worker nodes that can be launched on the Spark worker node. This is determined by the argument num_cpus_worker_node or num_gpus_worker_node.

If you do not set the argument object_store_memory_per_node, then the heap memory size and object store memory size allocated to each Ray worker node are:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

If you do set the argument object_store_memory_per_node:

RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

In addition, the object store memory size per Ray worker node is limited by the shared memory of the operating system. The maximum value is:

OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY is the /dev/shm disk size configured for the Spark worker node.

Best practices

How to set CPU / GPU number for each Ray worker node ?

Databricks recommends setting num_cpus_worker_node to the number of CPU cores per Spark worker node and setting num_gpus_worker_node to the number of GPUs per Spark worker node. In this config, each Spark worker node launches one Ray worker node that fully utilizes the resources of the Spark worker node.

GPU cluster configuration

The Ray cluster runs on top of a Databricks Spark cluster. A common scenario is to use a Spark job and Spark UDF to do simple data preprocessing tasks that do not need GPU resources, and then use Ray to execute complicated machine learning tasks that benefit from GPUs. In this case, Databricks recommends setting the Spark cluster level configuration parameter spark.task.resource.gpu.amount to 0, so that all Spark DataFrame transformations and Spark UDF executions do not use GPU resources.

The benefits of this configuration are the following:

  • It increases Spark job parallelism, because the GPU instance type usually has many more CPU cores than GPU devices.

  • If the Spark cluster is shared with multiple users, this configuration prevents Spark jobs from competing for GPU resources with concurrently running Ray workloads.

Disable transformers trainer mlflow integration if using it in Ray tasks

The transformers trainer MLflow integration is on by default. If you use Ray train to train it, the Ray task fails because the Databricks MLflow service credential is not configured for Ray tasks.

To avoid this issue, set the DISABLE_MLFLOW_INTEGRATION environment variable to ‘TRUE’ in databricks cluster config. For information on logging into MLflow in your Ray trainer tasks, see the section “Using MLflow in Ray tasks” for details.

Address Ray remote function pickling error

To execute Ray tasks, Ray uses pickle to serialize the task function. If pickling fails, determine the line(s) in your code where the failure occurs. Often, moving import commands into the task function addresses common pickling errors. For example, datasets.load_dataset is a widely used function that happens to be patched within Databricks Runtime, potentially rendering an external import unpickle-able. To correct this issue, you can update your code like this:

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Disable Ray memory monitor if the Ray task is unexpectedly killed with OOM error

In Ray 2.9.3, Ray memory monitor has known issues which cause Ray tasks to be erroneously killed.

To address the issue, disable the Ray memory monitor by setting the environment variable RAY_memory_monitor_refresh_ms to 0 in the Databricks cluster config.

Memory resource configuration for Spark and Ray hybrid workloads

If you run hybrid Spark and Ray workloads in a Databricks cluster, Databricks recommends that you reduce Spark executor memory to a small value, such as setting spark.executor.memory 4g in the Databricks cluster config. This is due to the Spark executor running within a Java process that triggers garbage collection (GC) lazily. The memory pressure for Spark dataset caching is rather high, causing a reduction in the available memory that Ray can use. To avoid potential OOM errors, Databricks recommends you reduce the configured ‘spark.executor.memory’ value to a smaller value than the default.

Computation resource configuration for Spark and Ray hybrid workloads

If you run hybrid Spark and Ray workloads in a Databricks cluster, set either the Spark cluster nodes to auto-scalable, the Ray worker nodes to auto-scalable, or both with auto-scaling enabled.

For example, if you have a fixed number of worker nodes in a Databricks cluster, consider enabling Ray-on-Spark autoscaling, so that when there is no Ray workload running, the Ray cluster scales down. As a result, the idle cluster resources are released so that Spark job can use them.

When Spark job completes and Ray job starts, it triggers the Ray-on-Spark cluster to scale up to meet the processing demands.

You can also make both the Databricks cluster and the Ray-on-spark cluster auto-scalable. Specifically, you can configure the Databricks cluster auto-scalable nodes to a maximum of 10 nodes and the Ray-on-Spark worker nodes to a maximum of 4 nodes (with one Ray worker node per spark worker), leaving Spark free to allocate up to 6 nodes for Spark tasks. This means that Ray workloads can use at most 4 nodes resources at the same time, while the Spark job can allocate at most 6 nodes worth of resources.

Applying transformation function to batches of data

When processing data in batches, Databricks recommends you use the Ray Data API with map_batches function. This approach can be more efficient and scalable, especially for large datasets or when performing complex computations that benefit from batch processing. Any Spark DataFrame can be converted to Ray data using the ray.data.from_spark API, and can be written out to databricks UC table using the API ray.data.write_databricks_table.

Using MLflow in Ray tasks

To use MLflow in Ray tasks, configure the following:

  • Databricks MLflow credentials in Ray tasks

  • MLflow runs on the Spark driver side that pass the generated run_id values to Ray tasks.

The following code is an example:

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in Spark driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in Spark driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Using notebook scoped python libraries or cluster python libraries in Ray tasks

Currently, Ray has a known issue that Ray tasks can’t use notebook-scoped Python libraries or cluster Python libraries. To address this limitation, run the following command in your notebook prior to launching a Ray-on-Spark cluster:

%pip install ray==<The Ray version you want to use> --force-reinstall

and then run the following command in your notebook to restart python kernel:

dbutils.library.restartPython()

Enable stack traces and flame graphs on the Ray Dashboard Actors page

On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors.

To view this information, install py-spy before starting the Ray cluster:

%pip install py-spy

Shut down a Ray cluster

To shut down a Ray cluster running on Databricks, call the ray.utils.spark.shutdown_ray_cluster API.

Note

Ray clusters also shut down when:

  • You detach your interactive notebook from your Databricks cluster.

  • Your Databricks job completes.

  • Your Databricks cluster is restarted or terminated.

  • There’s no activity for the specified idle time.

Example notebook

The following notebook demonstrates how to create a Ray cluster and run a Ray application on Databricks.

Ray on Spark starter notebook

Open notebook in new tab

Limitations

  • Multi-user shared Databricks clusters (isolation mode enabled) are not supported.

  • When using %pip to install packages, the Ray cluster will shut down. Make sure to start Ray after you’re done installing all of your libraries with %pip.

  • Using integrations that override the configuration from ray.util.spark.setup_ray_cluster can cause the Ray cluster to become unstable and can crash the Ray context. For example, using the xgboost_ray package and setting RayParams with an actor or cpus_per_actor configuration in excess of the Ray cluster configuration can silently crash the Ray cluster.