メインコンテンツまでスキップ

Databricks で Ray クラスターを作成して接続する

Databricks で Ray コンピュート クラスターを作成、設定、実行する方法を学習します

必要条件

Ray クラスターを作成するには、次の設定で Databricks 汎用コンピュート リソースにアクセスできる必要があります。

  • Databricks Runtime 12.2 LTS ML 以降。
  • 専用 (以前のシングル ユーザー) または分離なしの共有 アクセス モード:
注記

Ray クラスターは、現在、サーバレス コンピュートではサポートされていません。

Rayをインストールしてください

Databricks Runtime ML 15.0 以降では、Ray は Databricks クラスターにプリインストールされています。

15.0 より前にリリースされたランタイムの場合は、pip を使用してクラスターに Ray をインストールします。

%pip install ray[default]>=2.3.0

Databricks クラスターでユーザー固有の Ray クラスターを作成する

Ray クラスターを作成するには、 ray.util.spark.setup_ray_cluster APIです。

注記

ノートブックで Ray クラスターを作成すると、現在のノートブック ユーザーのみが使用できます。 Ray クラスターは、ノートブックがクラスターから切り離された後、または 30 分間非アクティブな状態が続いた後 (Ray にタスクが送信されていない) 後に自動的にシャットダウンされます。 すべてのユーザーと共有され、アクティブに実行されているノートブックの影響を受けない Ray クラスターを作成する場合は、代わりに ray.util.spark.setup_global_ray_cluster API を使用します。

fixed-size Ray クラスター

Databricks クラスターに接続されている任意の Databricks ノートブックで、次のコマンドを実行して固定サイズの Ray クラスターを開始できます。

Python
import ray
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
max_worker_nodes=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Auto-scaling Ray クラスター

Auto Scaling Ray クラスターを開始する方法については、「Databricksでの Ray クラスターのスケーリング」を参照してください。

グローバルモードの開始 Ray クラスター

Ray 2.9.0 以降を使用すると、 Databricks クラスター上にグローバル モードの Ray クラスターを作成できます。 グローバル モードの Ray クラスターを使用すると、 Databricks コンピュート リソースに接続されているすべてのユーザーが Ray クラスターも使用できます。 この Ray クラスターの実行モードには、単一ユーザーの Ray クラスターインスタンスを実行するときに専用のコンピュート リソースが持つアクティブなタイムアウト機能はありません。

複数のユーザーが Ray タスクにアタッチして実行できるグローバル レイ クラスターを開始するには、まず Databricks ノートブック ジョブを作成し、それを共有モード クラスター Databricks アタッチしてから、次のコマンドを実行します。

Python
from ray.util.spark import setup_global_ray_cluster

setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)

これは、ノートブック コマンド セルの [割り込み] ボタンをクリックして呼び出しを中断するか、ノートブックを Databricks クラスターから切り離すか、 Databricks クラスターを終了するまでアクティブのままになるブロッキング呼び出しです。 それ以外の場合、グローバル モードの Ray クラスターは引き続き実行され、承認されたユーザーがタスクを送信できるようになります。グローバル モード クラスターの詳細については 、Ray API のドキュメントを参照してください。

グローバル モード クラスターには、次のプロパティがあります。

  • Databricks クラスターでは、一度に作成できるアクティブなグローバル モード Ray クラスターは 1 つだけです。
  • Databricks クラスターでは、アタッチされた任意の Databricks ノートブックのすべてのユーザーが、アクティブなグローバル モードの Ray クラスターを使用できます。ray.init()を実行して、アクティブなグローバル モード Ray クラスターに接続できます。この Ray クラスターには複数のユーザーがアクセスできるため、リソースの競合が問題になる可能性があります。
  • グローバル モードの Ray クラスターは、 setup_ray_cluster コールが中断されるまで有効です。 シングル ユーザー Ray クラスターのような自動シャットダウン タイムアウトはありません。

Ray GPU クラスターを作成する

GPU クラスターの場合、これらのリソースを次の方法で Ray クラスターに追加できます。

Python
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

Ray クライアントを使用してリモート Ray クラスターに接続する

Ray バージョン 2.3.0 以降では、setup_ray_cluster APIを使用して Ray クラスターを作成でき、同じノートブックで ray.init() を呼び出すことができます この Ray クラスターに接続するAPI。リモート接続文字列を取得するには、次を使用します。

Python
from ray.util.spark import setup_ray_cluster

_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)

次に、上記のリモート接続文字列を使用してリモートクラスターを接続できます。

Python
import ray
ray.init(remote_conn_str)

Ray クライアントは、ray.data モジュールで定義された Ray データセット API をサポートしていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内にラップできます。

Python
import ray
import pandas as pd

# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")

@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()

ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI

For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:

``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py

構成する必要がある値は、https:// で始まる Databricks ワークスペース URL であり、/driver-proxy/o/後に見つかった値は、Ray クラスターの開始後に表示される Ray ダッシュボード プロキシ URL にあります。

Ray ジョブ CLI は、外部システムから Ray クラスターにジョブを送信するために使用されますが、 Databricksの Ray クラスターでジョブを送信するためには必要ありません。 ジョブは Databricks ジョブを使用してデプロイし、アプリケーションごとに Ray クラスターを作成し、Databricks アセットバンドル やワークフロートリガーなどの既存の Databricks ツールを使用してジョブをトリガーすることをお勧めします。

ログ出力場所の設定

引数 collect_log_to_path を設定して、Ray クラスター ログを収集する宛先パスを指定できます。 Ray クラスターがシャットダウンされた後にログ収集が実行される。

Databricks 、/dbfs/または Unity Catalog ボリュームパスで始まるパスを設定して、 Apache Spark クラスターを終了した場合でもログを保持することをお勧めします。 そうしないと、クラスターがシャットダウンされるとクラスター上のローカルストレージが削除されるため、ログを回復できません。

Ray クラスターを作成した後、ノートブックで任意の Ray アプリケーションコードを直接実行できます。 新しいタブで [Open Ray クラスター ダッシュボード ] をクリックして、クラスターの Ray ダッシュボードを表示します。

スタック トレースとフレーム グラフを Ray Dashboard Actors ページで有効にする

[Ray Dashboard Actors (レイ ダッシュボード アクタ)] ページでは、アクティブな Ray アクタのスタック トレースとフレーム グラフを表示できます。 この情報を表示するには、Ray クラスターを開始する前に、次のコマンドを使用して py-spy をインストールします。

Python
%pip install py-spy

ベスト プラクティスの作成と構成

このセクションでは、Ray クラスターを作成および設定するためのベストプラクティスについて説明します。

GPU以外のワークロード

Ray クラスターは、 Databricks Spark クラスター上で実行されます。 一般的なシナリオは、Spark ジョブと Spark UDF を使用して、GPU リソースを必要としない単純なデータ前処理タスクを実行することです。 次に、Ray を使用して、GPU の恩恵を受ける複雑な機械学習タスクを実行します。 この場合、 Databricks Apache Spark クラスター レベルの構成パラメーター spark.task.リソース.gpu.amount を 0 に設定して、すべての Apache Spark データフレーム 変換と Apache Spark UDF 実行で GPU リソースを使用しないようにすることをお勧めします。

この設定の利点は次のとおりです。

  • GPU インスタンスタイプには通常、GPU デバイスよりも多くの CPU コアがあるため、Apache Spark ジョブの並列処理が増加します。
  • Apache Spark クラスターが複数のユーザーと共有されている場合、この設定により、Apache Sparkジョブが同時に実行されている Ray ワークロードと GPU リソースをめぐって競合するのを防ぎます。

transformers トレーナー MLflow 統合を Ray タスクで使用する場合は無効にします

transformers trainer MLflow integration は、transformers ライブラリ内から デフォルト によって有効になります。 Ray train を使用して transformers モデルを微調整すると、資格情報の問題により Ray タスクが失敗します。 ただし、トレーニングに MLflow を直接使用する場合は、この問題は発生しません。この問題を回避するには、 クラスターの開始時に、 クラスター設定内から DISABLE_MLFLOW_INTEGRATION環境変数を 'TRUE'DatabricksApache Spark に設定します。

Address Ray リモート関数の酸洗エラー

Ray タスクを実行するために、Ray はタスク関数をピクルス化します。 pickle 化が失敗した場合は、コードのどの部分が失敗の原因であるかを診断する必要があります。 ピクルス化エラーの一般的な原因は、外部参照、クロージャ、およびステートフルオブジェクトへの参照の処理です。検証して迅速に修正するのが最も簡単なエラーの 1 つは、タスク関数宣言内でインポート ステートメントを移動することで修正できます。

たとえば、 datasets.load_dataset は広く使用されている関数であり、Databricks Runtime ドライバー側で修正プログラムが適用され、参照が pickle 化できなくなります。 これに対処するには、次のようにタスク関数を記述するだけです。

Python
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...

Ray タスクがメモリ不足 (OOM) エラーで予期せず強制終了された場合は、Ray メモリ モニタを無効にします

Ray 2.9.3 では、Ray メモリ モニタにいくつかの既知の問題があり、Ray タスクが原因もなく誤って停止する可能性があります。RAY_memory_monitor_refresh_ms``0この問題に対処するには、Databricks クラスターの開始時に、 クラスター構成内で環境変数 を [] に設定して、Ray メモリApache Spark モニターを無効にします。

データのバッチへの変換関数の適用

データをバッチで処理する場合は、 map_batches 関数と共に Ray Data API を使用することをお勧めします。 このアプローチは、特に大規模なデータセットやバッチ処理の恩恵を受ける複雑な計算の場合に、より効率的でスケーラブルです。 任意の Spark データフレーム は、 ray.data.from_spark API を使用して Ray データセットに変換できます。 この変換 API の呼び出しから処理された出力は、API ray.data.write_databricks_tableを使用して Databricks UC テーブルに書き込むことができます。

Ray チューナー、Ray トレイン、またはカスタマイズされた Ray タスクでの MLflow の使用

Databricks MLflow と Ray を統合するには、Ray 2.41 以降が必要です。

MLflow を Ray Tune、Ray Train、またはカスタマイズされた Ray タスクと共に使用するには、環境変数 (DATABRICKS_HOSTDATABRICKS_TOKEN) を設定するか、ray.util.spark.setup_ray_clusterを呼び出す前に環境変数 DATABRICKS_HOSTDATABRICKS_CLIENT_IDDATABRICKS_CLIENT_SECRET を設定します。次のコードは、これらの変数を設定する方法を示しています。

Python
import os
from ray.util.spark import setup_ray_cluster

os.environ["DATABRICKS_HOST"] = "https://....databricks.com"
os.environ["DATABRICKS_TOKEN"] = "<your PAT token"

setup_ray_cluster(num_cpus_worker_node=2, num_gpus_worker_node=0, max_worker_nodes=1, min_worker_nodes=1)

Use ノートブック スコープの Python ライブラリ または クラスター Python ライブラリ in Ray タスク

ノートブックスコープの Python ライブラリまたはクラスター Python ライブラリをリモート Ray タスクで使用するには、Ray 2.12 以降が必要です。

Ray バージョン 2.11 以下には、Ray タスクがノートブック scoped Python ライブラリまたは クラスター Python ライブラリ. Ray バージョン 2.11 以下では、Ray クラスターを開始する前に、 %pip magic コマンドを使用して、アクティブなセッション内に追加の依存関係を事前にインストールする必要があります。

次のステップ