Databricks で Ray クラスターを作成して接続する

Databricksで Ray コンピュート クラスターを作成、構成、実行する方法を学びます

要件

Ray クラスターを作成するには、次の設定でDatabricks汎用コンピュート リソースにアクセスできる必要があります。

  • Databricks Runtime 12.2 LTS ML 以上。

  • アクセス・モードは、「 シングル・ユーザー 」または 「分離共有なし」のいずれかである必要があります。

注:

Ray クラスターは現在、サーバーレス コンピュートではサポートされていません。

Rayのインストール

Databricks Runtime ML 15.0 以降では、Ray が Databricks クラスターにプリインストールされます。

15.0 より前にリリースされたランタイムの場合は、pip を使用してクラスターに Ray をインストールします。

%pip install ray[default]>=2.3.0

Databricks クラスターにユーザー固有の Ray クラスターを作成する

Ray クラスターを作成するには、 ray.util.spark.setup_ray_cluster APIです。

注:

ノートブックに Ray クラスターを作成すると、現在のノートブック ユーザーのみが使用できるようになります。 ノートブックがクラスターから切り離された後、または 30 分間非アクティブ状態が続いた後 (Ray にタスクが送信されていない)、Ray クラスターは自動的にシャットダウンされます。 すべてのユーザーと共有され、アクティブに実行されているノートブックの影響を受けない Ray クラスターを作成する場合は、代わりにray.util.spark.setup_global_ray_cluster API を使用します。

固定サイズのレイクラスター

Databricks クラスターに接続されている任意の Databricks ノートブックで、次のコマンドを実行して固定サイズの Ray クラスターを起動できます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  num_worker_nodes=2,
  num_cpus_per_node=4,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

自動スケーリング Ray クラスター

自動スケーリング Ray クラスターを開始する方法については、 Databricksでの Ray クラスターのスケーリング」を参照してください。

グローバルモードの開始 Ray クラスター

Ray 2.9.0 以降を使用すると、Databricks クラスター上にグローバル モードの Ray クラスターを作成できます。 グローバル モードの Ray クラスターを使用すると、Databricks クラスターに接続されているすべてのユーザーが Ray クラスターも使用できるようになります。 Ray クラスターを実行するこのモードには、シングルユーザー Ray クラスター インスタンスを実行するときにシングルユーザー クラスターが持つアクティブ タイムアウト機能はありません。

複数のユーザーが接続して Ray タスクを実行できるグローバル Ray クラスターを開始するには、まず Databricks ノートブック ジョブを作成し、それを共有モードの Databricks クラスターに接続してから、次のコマンドを実行します。

from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
  max_worker_nodes=2,
  ...
  # other arguments are the same as with the `setup_global_ray` API.
)

これは、ノートブック コマンド セルの [中断] ボタンをクリックするか、ノートブックを Databricks クラスターからデタッチするか、Databricks クラスターを終了することによって呼び出しを中断するまでアクティブなままになるブロッキング呼び出しです。 それ以外の場合、グローバル モードの Ray クラスターは引き続き実行され、承認されたユーザーによるタスクの送信が可能になります。 グローバル モード クラスターの詳細については、 Ray APIドキュメントを参照してください。

グローバル モード クラスターには次のプロパティがあります。

  • Databricks クラスターでは、一度に作成できるアクティブなグローバル モード Ray クラスターは 1 つだけです。

  • Databricks クラスターでは、アクティブなグローバル モードの Ray クラスターは、接続されているすべての Databricks ノートブック内のすべてのユーザーが使用できます。 ray.init()を実行すると、アクティブなグローバル モードの Ray クラスターに接続できます。 この Ray クラスターには複数のユーザーがアクセスできるため、リソースの競合が問題になる可能性があります。

  • グローバル モードの Ray クラスターは、 setup_ray_cluster呼び出しが中断されるまで稼働しています。 シングルユーザーの Ray クラスターのような自動シャットダウン タイムアウトはありません。

Ray GPUクラスターを作成する

GPU クラスターの場合、これらのリソースは次の方法で Ray クラスターに追加できます。

from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
  min_worker_nodes=2,
  max_worker_nodes=4,
  num_cpus_per_node=8,
  num_gpus_per_node=1,
  num_cpus_head_node=8,
  num_gpus_head_node=1,
  collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray クライアントを使用してリモート Ray クラスターに接続する

Rayバージョン2.3.0以降では、setup_ray_cluster APIを使用してRayクラスターを作成でき、同じウィンドウでray.init()を呼び出すことができます。 この Ray クラスターに接続するための API。 リモート接続文字列を取得するには、次を使用します。

from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

次に、上記のリモート接続文字列を使用してリモート クラスターに接続できます。

import ray
ray.init(remote_conn_str)

Ray クライアントは、ray.data モジュールで定義されている Ray データセット API をサポートしていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内にラップすることができます。

import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
    p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
    ds = ray.data.from_pandas(p1)
    return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit  --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

構成する必要がある値は、 https://で始まる Databricks ワークスペース URL と、Ray クラスターの起動後に表示される Ray ダッシュボード プロキシ URL の/driver-proxy/o/の後の値です。

Ray ジョブCLI 、外部システムから Ray クラスターにジョブを送信するために使用されますが、 Databricks上の Ray クラスターにジョブを送信する場合には必要ありません。 ジョブは Databricks ジョブを使用してデプロイし、アプリケーションごとに Ray クラスターを作成し、Databricks アセット バンドルやワークフロー トリガーなどの既存の Databricks ツールを使用してジョブをトリガーすることをお勧めします。

ログの出力場所を設定する

引数collect_log_to_pathを設定すると、Ray クラスター ログを収集する宛先パスを指定できます。 ログ収集は、Ray クラスターがシャットダウンされた後に実行されます。

Databricks/dbfs/Unity Catalog、Apache Spark クラスターを終了した場合でもログを保持するために、 で始まるパスまたは ボリューム パスを設定することをお勧めします。そうしないと、クラスターがシャットダウンされたときにクラスター上のローカル ストレージが削除されるため、ログを回復できなくなります。

Ray クラスターを作成したら、ノートブック内で任意の Ray アプリケーション コードを直接実行できます。 クラスターの Rayダッシュボードを表示するには、「Ray クラスター ダッシュボードを新しいタブで開く」をクリックします。

スタック トレースとフレーム グラフを Ray ダッシュボード アクタ ページで有効にする

[Ray Dashboard Actors (レイ ダッシュボード アクタ)] ページでは、アクティブな Ray アクタのスタック トレースとフレーム グラフを表示できます。 この情報を表示するには、Ray クラスターを起動する前に、次のコマンドを使用して py-spy をインストールします。

%pip install py-spy

ベスト プラクティスの作成と構成

このセクションでは、Ray クラスターの作成と構成に関するベスト プラクティスについて説明します。

GPU 以外のワークロード

Ray クラスターは、Databricks Spark クラスター上で実行されます。 一般的なシナリオは、Spark ジョブと Spark UDF を使用して、GPU リソースを必要としない単純なデータ前処理タスクを実行することです。 次に、Ray を使用して、GPU のメリットを享受できる複雑な機械学習タスクを実行します。 DatabricksApache SparkApache SparkDataFrameこの場合、 では、すべての 変換とApache SparkUDF 実行で GPU リソースが使用されないように、 クラスター レベル構成 パラメーター spark.タスク. リソース.gpu.amount を 0 に設定することをお勧めします。

この構成の利点は次のとおりです。

  • GPU インスタンス タイプには通常、GPU デバイスよりも多くの CPU コアがあるため、Apache Spark ジョブの並列処理が向上します。

  • Apache Spark クラスターが複数のユーザーと共有されている場合、この構成により、Apache Spark ジョブが同時に実行されている Ray ワークロードと GPU リソースを競合することがなくなります。

Ray タスクで使用する場合は、 transformersトレーナー MLflow 統合を無効にします

transformersトレーナー MLflow 統合は、 transformersライブラリ内からデフォルトで有効になっています。 Ray トレーニングを使用してtransformersモデルを微調整すると、資格情報の問題により Ray タスクは失敗します。 ただし、トレーニングに MLflow を直接使用する場合は、この問題は発生しません。 この問題を回避するには、Apache Spark クラスターを起動するときに、Databricks クラスター構成内からDISABLE_MLFLOW_INTEGRATION環境変数を 'TRUE' に設定します。

Ray リモート関数の pickle 化エラーに対処する

Rayタスクを実行するために、Rayはタスク関数をpickle化します。 pickle 化に失敗した場合は、コードのどの部分が失敗の原因であるかを診断する必要があります。 pickle 化エラーの一般的な原因は、外部参照、クロージャ、およびステートフルオブジェクトへの参照の処理です。 最も簡単に検証してすぐに修正できるエラーの 1 つは、タスク関数宣言内でインポート ステートメントを移動することによって修正できます。

たとえば、 datasets.load_dataset 、Databricks Runtime ドライバー側でパッチが適用され、参照を非ピクル化可能にする、広く使用されている関数です。 これを解決するには、タスク関数を次のように記述するだけです。

def ray_task_func():
  from datasets import load_dataset  # import the function inside task function
  ...

Ray タスクがメモリ不足 (OOM) エラーで予期せず終了した場合は、Ray メモリ モニターを無効にします。

Ray 2.9.3 では、Ray メモリ モニターに、Ray タスクが理由もなく誤って停止される可能性がある既知の問題がいくつかあります。 この問題を解決するには、Apache Spark クラスターの起動時に、Databricks クラスター構成内で環境変数RAY_memory_monitor_refresh_ms0に設定して、Ray メモリ モニターを無効にすることができます。

RayからSparkデータを読み取る

一般的な使用例は、Spark DataFrame からデータを Ray に読み込んでさらに処理することです。 Databricks Runtime 15.0 for ML 以降では、Spark DataFrame 内に含まれるデータを Ray に直接簡単に読み込むことができる関数が利用できます。

この機能を効果的に使用するには、 ray.init()を実行して Ray クラスターを起動する前に、Spark クラスター構成spark.databricks.pyspark.dataFrameChunk.enabledtrueに設定されていることを確認してください。

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

Ray は、データを一時的に書き込む必要なく Spark DataFrame の内容を直接取得し、直接処理できるようにします。

SparkからRayデータを読み取る

Ray から Spark データを読み取るのと同様に、Unity Catalog を使用して、Ray タスクの結果を Spark DataFrame として読み取る機能がサポートされています。 この機能を使用するには、Unity Catalog 対応ワークスペース内から Databricks Runtime 15.0 for ML 以降を実行している必要があります。

この機能を使用するには、 環境変数 "_RAY_UC_VOLUMES_FUST_TEMP_DIR" が有効でアクセス可能なUnity Catalog Volume パスに設定されていることを確認してください。 "/Volumes/MyCatalog/MySchema/MyVolume/MyRayData"

import ray.data

source_table = "my_database.my_table"

spark_dataframe = spark.read.table(source_table)
ray_dataset = ray.data.from_spark(spark_dataframe)

# Write to the specified (via environment variable) UC Volume from Ray
ray_dataset.write_databricks_table()

Databricks Runtimeの 15.0 より前のML バージョンでは、 モジュールの RayParquet ライターray_dataset.write_parquet() を使用して、オブジェクトray.data ストアの場所に直接書き込むことができます。Sparkネイティブ リーダーを使用してこのParquetデータを読み取ることができます。

データのバッチに変換関数を適用する

データをバッチ処理する場合は、 map_batches関数を使用して Ray Data API を使用することをお勧めします。 このアプローチは、特にバッチ処理のメリットを享受できる大規模なデータセットや複雑な計算の場合、より効率的でスケーラブルになります。 任意のSpark DataFrame 、ray.data.from_spark APIを使用して Ray データセットに変換できます。 この変換 API を呼び出して処理された出力は、API ray.data.write_databricks_tableを使用して Databricks UC テーブルに書き出すことができます。

Ray タスクでの MLflow の使用

Ray タスクで MLflow を使用するには、次のことを行う必要があります。

  • Ray タスク内で Databricks MLflow 資格情報を定義します。

  • Apache Spark ドライバー内で MLflow の実行を作成し、作成した run_id を Ray タスクに渡します。

次のコード例は、その方法を示しています。

import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")

experiment_name = "/Users/<your-name> <Databricks>.com/mlflow_test"
mlflow.set_experiment(experiment_name)

@ray.remote
def ray_task(x, run_id):
  import os
  os.environ.update(mlflow_db_creds)
  mlflow.set_experiment(experiment_name)
  # We need to use the run created in <AS> driver side,
  # and set `nested=True` to make it a nested run inside the
  # parent run.
  with mlflow.start_run(run_id=run_id, nested=True):
    mlflow.log_metric(f"task_{x}_metric", x)
  return x

with mlflow.start_run() as run:  # create MLflow run in <AS> driver side.
  results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])

Ray タスクでノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用する

現在、Ray には、Ray タスクがノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用できないという既知の問題があります。 Ray ジョブ内で追加の依存関係を利用するには、タスク内でこれらの依存関係を使用する Ray-on-Spark クラスターを起動する前に、 %pipマジック コマンドを使用してライブラリを手動でインストールする必要があります。 たとえば、Ray クラスターの起動に使用する Ray のバージョンを更新するには、ノートブックで次のコマンドを実行します。

%pip install ray==<The Ray version you want to use> --force-reinstall

次に、ノートブックで次のコマンドを実行して、Python カーネルを再起動します。

dbutils.library.restartPython()