Use Ray on Databricks
Preview
This feature is in Public Preview.
Ray 2.3.0 and above supports creating Ray clusters and running Ray applications on Apache Spark clusters with Databricks. For information about getting started with machine learning on Ray, including tutorials and examples, see the Ray documentation. For more information about the Ray and Apache Spark integration, see the Ray on Spark API documentation.
Requirements
Databricks Runtime 12.0 ML and above.
Databricks Runtime cluster access mode must be either “assigned” mode or “no isolation shared” mode.
Install Ray
Use the following command to install Ray. The [default]
extension is required by the Ray dashboard component.
%pip install ray[default]>=2.3.0
Create a Ray cluster
To create a Ray cluster, use the ray.util.spark.setup_ray_cluster API.
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
The ray.util.spark.setup_ray_cluster
API creates a Ray cluster on Spark. Internally, it creates a background
Spark job. Each Spark task in the job creates a Ray worker node, and the Ray head node is created
on the driver. The argument num_worker_nodes
represents the number of Ray worker nodes to
create. To specify the number of CPU or GPU cores assigned to each Ray worker node, set the argument
num_cpus_per_node
or num_gpus_per_node
.
After a Ray cluster is created, you can run any Ray application code directly in your notebook. An HTML link Open Ray Cluster Dashboard in a new tab is also displayed, allowing you to view the Ray dashboard for the cluster.
Tip
If you’re using a Databricks assigned mode cluster, you can set num_worker_nodes
to
ray.util.spark.MAX_NUM_WORKER_NODES
in order to use all available resources for your Ray cluster.
setup_ray_cluster(
# ...
num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)
You can set the argument collect_log_to_path
to specify the destination path where you want to collect the Ray cluster logs. Log collection runs after the Ray cluster is shut down. Databricks recommends that you set a path starting with /dbfs/
so that the logs are preserved even if you terminate the Spark cluster. Otherwise, your logs are not recoverable since the local storage on the cluster is deleted when the cluster is shut down.
Note
Calling ray.util.spark.setup_ray_cluster
sets the RAY_ADDRESS
environment variable
to the address of the created Ray cluster, so your Ray application automatically uses
this Ray cluster. You can specify an alternative cluster address using the address
argument
of the ray.init API.
Run a Ray application
After the Ray cluster has been created, you can run any Ray application code in a Databricks notebook.
Important
Databricks recommends that you install any necessary libraries for your application with %pip install <your-library-dependency>
to ensure they are available to your Ray cluster and application accordingly. Specifying dependencies in the Ray init function call installs the dependencies in a location inaccessible to the Spark worker nodes, which results in version incompatibilities and import errors.
For example, you can run a simple Ray application in a Databricks notebook as follows:
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
Load data from a Spark DataFrame
To load a Spark DataFrame as a Ray Dataset, firstly you need to save the spark DataFrame to DBFS
using Parquet or Delta format. In order to control DBFS access securely, Databricks recommends that you mount cloud object storage to DBFS. Then, you can create a ray.data.Dataset
instance from the saved Spark DataFrame path using the following helper method:
import ray
import os
from urllib.parse import urlparse
def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
spark_df.write.mode('overwrite').parquet(dbfs_tmp_path)
fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
return ray.data.read_parquet(fuse_path)
# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")
# Provide a dbfs location to write the table to
data_location_2 = (
"dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)
# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
spark_dataframe=spark_df,
dbfs_tmp_path=data_location_2
)
Tune the Ray cluster configuration
The recommended configuration for each Ray worker node is:
Minimum 4 CPU cores per Ray worker node.
Minimum 10GB heap memory for each Ray worker node.
So, when calling ray.util.spark.setup_ray_cluster
, Databricks recommends setting num_cpus_per_node
to a value >=4.
See Memory allocation for Ray worker nodes for details about tuning heap memory for each Ray worker node.
Memory allocation for Ray worker nodes
Each Ray worker node uses two types of memory: heap memory and object store memory. The allocated memory size for each type is determined as described below.
The total memory allocated to each Ray worker node is:
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
is the maximum number of Ray worker nodes that can be launched on the Spark worker node. This is determined by the argument num_cpus_per_node
or num_gpus_per_node
.
If you do not set the argument object_store_memory_per_node
, then the heap memory size and
object store memory size allocated to each Ray worker node are:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
If you do set the argument object_store_memory_per_node
:
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
In addition, the object store memory size per Ray worker node is limited by the shared memory of the operating system. The maximum value is:
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
is the /dev/shm
disk size configured for the Spark worker node.
Enable stack traces and flame graphs on the Ray Dashboard Actors page
On the Ray Dashboard Actors page, you can view stack traces and flame graphs for active Ray actors. To view this information, use the following command to install “py-spy” before you starting the Ray cluster:
%pip install py-spy
Shut down a Ray cluster
To shut down a Ray cluster running on Databricks, you can call the ray.utils.spark.shutdown_ray_cluster API.
Note
Ray clusters also shut down when:
You detach your interactive notebook from your Databricks cluster.
Your Databricks job completes.
Your Databricks cluster is restarted or terminated.
There’s no activity for the specified idle time.
Example notebook
The following notebook demonstrates how to create a Ray cluster and run a Ray application on Databricks.
Limitations
Ray cluster autoscaling is not supported yet. The API
ray.util.spark.setup_ray_cluster
can only start Ray cluster with fixed number of Ray worker nodes.Multi-user shared Databricks clusters (isolation mode enabled) are not supported.
When using %pip to install packages, the Ray cluster will shut down. Make sure to start Ray after you’re done installing all of your libraries with %pip.
Using integrations that override the configuration from
ray.util.spark.setup_ray_cluster
can cause the Ray cluster to become unstable and can crash the Ray context. For example, using thexgboost_ray
package and settingRayParams
with an actor orcpus_per_actor
configuration in excess of the Ray cluster configuration can silently crash the Ray cluster.