Use Ray on Databricks
With Ray 2.3.0 and above, you can create Ray clusters and run Ray applications on Apache Spark clusters with Databricks. For information about getting started with machine learning on Ray, including tutorials and examples, see the Ray documentation. For more information about the Ray and Apache Spark integration, see the Ray on Spark API documentation.
Requirements
Databricks Runtime 12.0 ML and above.
Databricks Runtime cluster access mode must be either “Assigned” mode or “No isolation shared” mode.
Install Ray
Use the following command to install Ray. The [default]
extension is required by the Ray dashboard component.
%pip install ray[default]>=2.3.0
Create a user-specific Ray cluster in a Databricks cluster
To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.
In any Databricks notebook that is attached to a Databricks cluster, you can run the following command:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_worker_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
The ray.util.spark.setup_ray_cluster
API creates a Ray cluster on Spark. Internally, it creates a background Spark job. Each Spark task in the job creates a Ray worker node, and the Ray head node is created on the driver. The argument num_worker_nodes
represents the number of Ray worker nodes to create. To specify the number of CPU or GPU cores assigned to each Ray worker node, set the argument num_cpus_worker_node
(default value: 1) or num_gpus_worker_node
(default value: 0).
After a Ray cluster is created, you can run any Ray application code directly in your notebook. Click Open Ray Cluster Dashboard in a new tab to view the Ray dashboard for the cluster.
Tip
If you’re using a Databricks single user cluster, you can set num_worker_nodes
to ray.util.spark.MAX_NUM_WORKER_NODES
to use all available resources for your Ray cluster.
setup_ray_cluster(
# ...
num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)
Set the argument collect_log_to_path
to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down. Databricks recommends that you set a path starting with /dbfs/
so that the logs are preserved even if you terminate the Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.
Note
“To have your Ray application automatically use the Ray cluster that was created, call ray.util.spark.setup_ray_cluster
to set the RAY_ADDRESS
environment variable to the address of the Ray cluster.” You can specify an alternative cluster address using the address
argument of the ray.init API.
Run a Ray application
After the Ray cluster has been created, you can run any Ray application code in a Databricks notebook.
Important
Databricks recommends that you install any necessary libraries for your application with %pip install <your-library-dependency>
to ensure they are available to your Ray cluster and application accordingly. Specifying dependencies in the Ray init function call installs the dependencies in a location inaccessible to the Spark worker nodes, which results in version incompatibilities and import errors.
For example, you can run a simple Ray application in a Databricks notebook as follows:
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
Create a Ray cluster in autoscaling mode
In Ray 2.8.0 and above, Ray clusters started on Databricks support integration with Databricks autoscaling. See Databricks cluster autoscaling.
With Ray 2.8.0 and above, you can create a Ray cluster on a Databricks cluster that supports scaling up or down according to workloads. This autoscaling integration triggers Databricks cluster autoscaling internally within the Databricks environment.
To enable autoscaling, run the following command:
from ray.util.spark import setup_ray_cluster
setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
... # other arguments
)
If autoscaling is enabled, num_worker_nodes
indicates the maximum number of Ray worker nodes. The default minimum number of Ray worker nodes is 0. This default setting means that when the Ray cluster is idle, it scales down to zero Ray worker nodes. This may not be ideal for fast responsiveness in all scenarios, but when enabled, can greatly reduce costs.
In autoscaling mode, num_worker_nodes
cannot be set to ray.util.spark.MAX_NUM_WORKER_NODES
.
The following arguments configure the upscaling and downscaling speed:
autoscale_upscaling_speed
represents the number of nodes allowed to be pending as a multiple of the current number of nodes. The higher the value, the more aggressive the upscaling. For example, if this is set to 1.0, the cluster can grow in size by at most 100% at any time.autoscale_idle_timeout_minutes
represents the number of minutes that need to pass before an idle worker node is removed by the autoscaler. The smaller the value, the more aggressive the downscaling.
With Ray 2.9.0 and above, you can also set autoscale_min_worker_nodes
to prevent the Ray cluster from scaling down to zero workers when the Ray cluster is idle.
Connect to remote Ray cluster using Ray client
In Ray 2.9.3, create a Ray cluster by calling the setup_ray_cluster
API. In the same notebook, call the ray.init()
API to connect to this Ray cluster.
For a Ray cluster that is not in global mode, get the remote connection string with following code:
To get the remote connection string using the following:
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
Connect to the remote cluster using this remote connection string:
import ray
ray.init(remote_conn_str)
The Ray client does not support the Ray dataset API defined in the ray.data
module. As a workaround, you can wrap your code that calls the Ray dataset API inside a remote Ray task, as shown in the following code:
import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
Load data from a Spark DataFrame
To load a Spark DataFrame as a Ray Dataset, first, you must save the Spark DataFrame to UC volumes or Databricks Filesystem (deprecated) as Parquet format. To control Databricks Filesystem access securely, Databricks recommends that you mount cloud object storage to DBFS. Then, you can create a ray.data.Dataset
instance from the saved Spark DataFrame path using the following helper method:
import ray
import os
from urllib.parse import urlparse
def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
return ray.data.read_parquet(fuse_path)
# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")
# Provide a dbfs location to write the table to
data_location_2 = (
"dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)
# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
spark_dataframe=spark_df,
dbfs_tmp_path=data_location_2
)
Load data from a Unity Catalog table through Databricks SQL warehouse
For Ray 2.8.0 and above, you can call the ray.data.read_databricks_tables
API to load data from a Databricks Unity Catalog table.
First, you need to set the DATABRICKS_TOKEN
environment variable to your Databricks warehouse access token. If you’re not running your program on Databricks Runtime, set the DATABRICKS_HOST
environment variable to the Databricks workspace URL, as shown in the following:
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
Then, call ray.data.read_databricks_tables()
to read from the Databricks SQL warehouse.
import ray
ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
Configure resources used by Ray head node
By default, for the Ray on Spark configuration, Databricks restricts resources allocated to the Ray head node to:
0 CPU cores
0 GPUs
128 MB heap memory
128 MB object store memory
This is because the Ray head node is usually used for global coordination, not for executing Ray tasks. The Spark driver node resources are shared with multiple users, so the default setting saves resources on the Spark driver side.
With Ray 2.8.0 and above, you can configure resources used by the Ray head node. Use the following arguments in the setup_ray_cluster
API:
num_cpus_head_node
: setting CPU cores used by Ray head nodenum_gpus_head_node
: setting GPU used by Ray head nodeobject_store_memory_head_node
: setting object store memory size by Ray head node
Support for heterogeneous clusters
For more efficient and cost effective training runs, you can create a Ray on Spark cluster and set different configurations between the Ray head node and Ray worker nodes. However, all Ray worker nodes must have the same configuration. Databricks clusters do not fully support heterogeneous clusters, but you can create a Databricks cluster with different driver and worker instance types by setting a cluster policy.
For example:
{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}
Tune the Ray cluster configuration
The recommended configuration for each Ray worker node is:
Minimum 4 CPU cores per Ray worker node.
Minimum 10GB heap memory for each Ray worker node.
When calling ray.util.spark.setup_ray_cluster
, Databricks recommends setting num_cpus_worker_node
to a value >= 4
.
See Memory allocation for Ray worker nodes for details about tuning heap memory for each Ray worker node.
Memory allocation for Ray worker nodes
Each Ray worker node uses two types of memory: heap memory and object store memory. The allocated memory size for each type is determined as described below.
The total memory allocated to each Ray worker node is:
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
is the maximum number of Ray worker nodes that can be launched on the Spark worker node. This is determined by the argument num_cpus_worker_node
or num_gpus_worker_node
.
If you do not set the argument object_store_memory_per_node
, then the heap memory size and
object store memory size allocated to each Ray worker node are:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
If you do set the argument object_store_memory_per_node
:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
In addition, the object store memory size per Ray worker node is limited by the shared memory of the operating system. The maximum value is:
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
is the /dev/shm
disk size configured for the Spark worker node.
Best practices
How to set CPU / GPU number for each Ray worker node ?
Databricks recommends setting num_cpus_worker_node
to the number of CPU cores per Spark worker node and setting num_gpus_worker_node
to the number of GPUs per Spark worker node. In this config, each Spark worker node launches one Ray worker node that fully utilizes the resources of the Spark worker node.
GPU cluster configuration
The Ray cluster runs on top of a Databricks Spark cluster. A common scenario is to use a Spark job and Spark UDF to do simple data preprocessing tasks that do not need GPU resources, and then use Ray to execute complicated machine learning tasks that benefit from GPUs. In this case, Databricks recommends setting the Spark cluster level configuration parameter spark.task.resource.gpu.amount
to 0
, so that all Spark DataFrame transformations and Spark UDF executions do not use GPU resources.
The benefits of this configuration are the following:
It increases Spark job parallelism, because the GPU instance type usually has many more CPU cores than GPU devices.
If the Spark cluster is shared with multiple users, this configuration prevents Spark jobs from competing for GPU resources with concurrently running Ray workloads.
Disable transformers
trainer mlflow integration if using it in Ray tasks
The transformers
trainer MLflow integration is on by default. If you use Ray train to train it, the Ray task fails because the Databricks MLflow service credential is not configured for Ray tasks.
To avoid this issue, set the DISABLE_MLFLOW_INTEGRATION
environment variable to ‘TRUE’ in databricks cluster config. For information on logging into MLflow in your Ray trainer tasks, see the section “Using MLflow in Ray tasks” for details.
Address Ray remote function pickling error
To execute Ray tasks, Ray uses pickle to serialize the task function. If pickling fails, determine the line(s) in your code where the failure occurs. Often, moving import
commands into the task function addresses common pickling errors. For example, datasets.load_dataset
is a widely used function that happens to be patched within Databricks Runtime, potentially rendering an external import unpickle-able. To correct this issue, you can update your code like this:
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Disable Ray memory monitor if the Ray task is unexpectedly killed with OOM error
In Ray 2.9.3, Ray memory monitor has known issues which cause Ray tasks to be erroneously killed.
To address the issue, disable the Ray memory monitor by setting the environment variable RAY_memory_monitor_refresh_ms
to 0
in the Databricks cluster config.
Memory resource configuration for Spark and Ray hybrid workloads
If you run hybrid Spark and Ray workloads in a Databricks cluster, Databricks recommends that you reduce Spark executor memory to a small value, such as setting spark.executor.memory 4g
in the Databricks cluster config. This is due to the Spark executor running within a Java process that triggers garbage collection (GC) lazily. The memory pressure for Spark dataset caching is rather high, causing a reduction in the available memory that Ray can use. To avoid potential OOM errors, Databricks recommends you reduce the configured ‘spark.executor.memory’ value to a smaller value than the default.
Computation resource configuration for Spark and Ray hybrid workloads
If you run hybrid Spark and Ray workloads in a Databricks cluster, set either the Spark cluster nodes to auto-scalable, the Ray worker nodes to auto-scalable, or both with auto-scaling enabled.
For example, if you have a fixed number of worker nodes in a Databricks cluster, consider enabling Ray-on-Spark autoscaling, so that when there is no Ray workload running, the Ray cluster scales down. As a result, the idle cluster resources are released so that Spark job can use them.
When Spark job completes and Ray job starts, it triggers the Ray-on-Spark cluster to scale up to meet the processing demands.
You can also make both the Databricks cluster and the Ray-on-spark cluster auto-scalable. Specifically, you can configure the Databricks cluster auto-scalable nodes to a maximum of 10 nodes and the Ray-on-Spark worker nodes to a maximum of 4 nodes (with one Ray worker node per spark worker), leaving Spark free to allocate up to 6 nodes for Spark tasks. This means that Ray workloads can use at most 4 nodes resources at the same time, while the Spark job can allocate at most 6 nodes worth of resources.
Applying transformation function to batches of data
When processing data in batches, Databricks recommends you use the Ray Data API with map_batches
function. This approach can be more efficient and scalable, especially for large datasets or when performing complex computations that benefit from batch processing. Any Spark DataFrame can be converted to Ray data using the ray.data.from_spark
API, and can be written out to databricks UC table using the API ray.data.write_databricks_table
.
Using MLflow in Ray tasks
To use MLflow in Ray tasks, configure the following:
Databricks MLflow credentials in Ray tasks
MLflow runs on the Spark driver side that pass the generated
run_id
values to Ray tasks.
The following code is an example:
import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(experiment_name)
# We need to use the run created in Spark driver side,
# and set `nested=True` to make it a nested run inside the
# parent run.
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metric(f"task_{x}_metric", x)
return x
with mlflow.start_run() as run: # create MLflow run in Spark driver side.
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Using notebook scoped python libraries or cluster python libraries in Ray tasks
Currently, Ray has a known issue that Ray tasks can’t use notebook-scoped Python libraries or cluster Python libraries. To address this limitation, run the following command in your notebook prior to launching a Ray-on-Spark cluster:
%pip install ray==<The Ray version you want to use> --force-reinstall
and then run the following command in your notebook to restart python kernel:
dbutils.library.restartPython()
Enable stack traces and flame graphs on the Ray Dashboard Actors page
On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors.
To view this information, install py-spy
before starting the Ray cluster:
%pip install py-spy
Shut down a Ray cluster
To shut down a Ray cluster running on Databricks, call the ray.utils.spark.shutdown_ray_cluster API.
Note
Ray clusters also shut down when:
You detach your interactive notebook from your Databricks cluster.
Your Databricks job completes.
Your Databricks cluster is restarted or terminated.
There’s no activity for the specified idle time.
Example notebook
The following notebook demonstrates how to create a Ray cluster and run a Ray application on Databricks.
Limitations
Multi-user shared Databricks clusters (isolation mode enabled) are not supported.
When using %pip to install packages, the Ray cluster will shut down. Make sure to start Ray after you’re done installing all of your libraries with %pip.
Using integrations that override the configuration from
ray.util.spark.setup_ray_cluster
can cause the Ray cluster to become unstable and can crash the Ray context. For example, using thexgboost_ray
package and settingRayParams
with an actor orcpus_per_actor
configuration in excess of the Ray cluster configuration can silently crash the Ray cluster.