Databricks で Ray クラスターを作成して接続する
Databricksで Ray コンピュート クラスターを作成、構成、実行する方法を学びます
要件
Ray クラスターを作成するには、次の設定でDatabricks汎用コンピュート リソースにアクセスできる必要があります。
Databricks Runtime 12.2 LTS ML 以上。
次のいずれかの アクセスモードを使用します。
シングルユーザーアクセスモード
専用アクセス モード (Databricks Runtime 15.4 LTS ML 以降が必要)
分離なし共有アクセス モード
注:
Ray クラスターは現在、サーバーレス コンピュートではサポートされていません。
Rayのインストール
Databricks Runtime ML 15.0 以降では、Ray が Databricks クラスターにプリインストールされます。
15.0 より前にリリースされたランタイムの場合は、pip を使用してクラスターに Ray をインストールします。
%pip install ray[default]>=2.3.0
Databricks クラスターにユーザー固有の Ray クラスターを作成する
Ray クラスターを作成するには、 ray.util.spark.setup_ray_cluster APIです。
注:
ノートブックに Ray クラスターを作成すると、現在のノートブック ユーザーのみが使用できるようになります。 ノートブックがクラスターから切り離された後、または 30 分間非アクティブ状態が続いた後 (Ray にタスクが送信されていない)、Ray クラスターは自動的にシャットダウンされます。 すべてのユーザーと共有され、アクティブに実行されているノートブックの影響を受けない Ray クラスターを作成する場合は、代わりにray.util.spark.setup_global_ray_cluster
API を使用します。
固定サイズのレイクラスター
Databricks クラスターに接続されている任意の Databricks ノートブックで、次のコマンドを実行して固定サイズの Ray クラスターを起動できます。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
自動スケーリング Ray クラスター
自動スケーリング Ray クラスターを開始する方法については、 Databricksでの Ray クラスターのスケーリング」を参照してください。
グローバルモードの開始 Ray クラスター
Ray 2.9.0 以降を使用すると、Databricks クラスター上にグローバル モードの Ray クラスターを作成できます。 グローバル モードの Ray クラスターでは、Databricks クラスターに接続されているすべてのユーザーも Ray クラスターを使用できます。 この Ray クラスターの実行モードには、単一ユーザー Ray クラスター インスタンスを実行するときに 1 つのユーザー クラスターが持つアクティブなタイムアウト機能はありません。
複数のユーザーが接続して Ray タスクを実行できるグローバル Ray クラスターを開始するには、まず Databricks ノートブック ジョブを作成し、それを共有モードの Databricks クラスターに接続してから、次のコマンドを実行します。
from ray.util.spark import setup_global_ray_cluster
setup_global_ray_cluster(
max_worker_nodes=2,
...
# other arguments are the same as with the `setup_global_ray` API.
)
これは、ノートブック コマンド セルの [中断] ボタンをクリックするか、ノートブックを Databricks クラスターからデタッチするか、Databricks クラスターを終了することによって呼び出しを中断するまでアクティブなままになるブロッキング呼び出しです。 それ以外の場合、グローバル モードの Ray クラスターは引き続き実行され、承認されたユーザーによるタスクの送信が可能になります。 グローバル モード クラスターの詳細については、 Ray APIドキュメントを参照してください。
グローバル モード クラスターには次のプロパティがあります。
Databricks クラスターでは、一度に作成できるアクティブなグローバル モード Ray クラスターは 1 つだけです。
Databricks クラスターでは、アクティブなグローバル モードの Ray クラスターは、接続されているすべての Databricks ノートブック内のすべてのユーザーが使用できます。
ray.init()
を実行すると、アクティブなグローバル モードの Ray クラスターに接続できます。 この Ray クラスターには複数のユーザーがアクセスできるため、リソースの競合が問題になる可能性があります。グローバル モードの Ray クラスターは、
setup_ray_cluster
呼び出しが中断されるまで稼働しています。 シングル ユーザー Ray クラスターのような自動シャットダウン タイムアウトはありません。
Ray GPUクラスターを作成する
GPU クラスターの場合、これらのリソースは次の方法で Ray クラスターに追加できます。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=8,
num_gpus_per_node=1,
num_cpus_head_node=8,
num_gpus_head_node=1,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
Ray クライアントを使用してリモート Ray クラスターに接続する
Rayバージョン2.3.0以降では、setup_ray_cluster APIを使用してRayクラスターを作成でき、同じウィンドウでray.init()を呼び出すことができます。 この Ray クラスターに接続するための API。 リモート接続文字列を取得するには、次を使用します。
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
次に、上記のリモート接続文字列を使用してリモート クラスターに接続できます。
import ray
ray.init(remote_conn_str)
Ray クライアントは、ray.data モジュールで定義されている Ray データセット API をサポートしていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内にラップすることができます。
import ray
import pandas as pd
# Note: This must be run in the same VPC/network as the Spark cluster
# so it can reach this address
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
## Connecting the Ray Cluster to the Ray Job CLI
For many developers moving from self-managed Ray solutions to a <Databricks> solution, there is often existing infrastructure tooling built based on the Ray CLI tools. While <Databricks> currently does not support Ray Cluster CLI integration, the Ray Job CLI can be connected through the driver proxy to the Ray cluster running on <Databricks>. For example:
``` shell
ray job submit --headers '{"cookie" : "DATAPLANE_DOMAIN_SESSIONID=<REDACTED>"}' --address 'https://<DATABRICKS WORKSPACE URL>/driver-proxy/o/<etc>' --working-dir='.' -- python run_task.py
構成する必要がある値は、 https://
で始まる Databricks ワークスペース URL と、Ray クラスターの起動後に表示される Ray ダッシュボード プロキシ URL の/driver-proxy/o/
の後の値です。
Ray ジョブCLI 、外部システムから Ray クラスターにジョブを送信するために使用されますが、 Databricks上の Ray クラスターにジョブを送信する場合には必要ありません。 ジョブは Databricks ジョブを使用してデプロイし、アプリケーションごとに Ray クラスターを作成し、Databricks アセット バンドルやワークフロー トリガーなどの既存の Databricks ツールを使用してジョブをトリガーすることをお勧めします。
ログの出力場所を設定する
引数collect_log_to_path
を設定すると、Ray クラスター ログを収集する宛先パスを指定できます。 ログ収集は、Ray クラスターがシャットダウンされた後に実行されます。
Databricks/dbfs/
Unity Catalog、Apache Spark クラスターを終了した場合でもログを保持するために、 で始まるパスまたは ボリューム パスを設定することをお勧めします。そうしないと、クラスターがシャットダウンされたときにクラスター上のローカル ストレージが削除されるため、ログを回復できなくなります。
Ray クラスターを作成したら、ノートブック内で任意の Ray アプリケーション コードを直接実行できます。 クラスターの Rayダッシュボードを表示するには、「Ray クラスター ダッシュボードを新しいタブで開く」をクリックします。
スタック トレースとフレーム グラフを Ray ダッシュボード アクタ ページで有効にする
[Ray Dashboard Actors (レイ ダッシュボード アクタ)] ページでは、アクティブな Ray アクタのスタック トレースとフレーム グラフを表示できます。 この情報を表示するには、Ray クラスターを起動する前に、次のコマンドを使用して py-spy をインストールします。
%pip install py-spy
ベスト プラクティスの作成と構成
このセクションでは、Ray クラスターの作成と構成に関するベスト プラクティスについて説明します。
GPU 以外のワークロード
Ray クラスターは、Databricks Spark クラスター上で実行されます。 一般的なシナリオは、Spark ジョブと Spark UDF を使用して、GPU リソースを必要としない単純なデータ前処理タスクを実行することです。 次に、Ray を使用して、GPU のメリットを享受できる複雑な機械学習タスクを実行します。 DatabricksApache SparkApache SparkDataFrameこの場合、 では、すべての 変換とApache SparkUDF 実行で GPU リソースが使用されないように、 クラスター レベル構成 パラメーター spark.タスク. リソース.gpu.amount を 0 に設定することをお勧めします。
この構成の利点は次のとおりです。
GPU インスタンス タイプには通常、GPU デバイスよりも多くの CPU コアがあるため、Apache Spark ジョブの並列処理が向上します。
Apache Spark クラスターが複数のユーザーと共有されている場合、この構成により、Apache Spark ジョブが同時に実行されている Ray ワークロードと GPU リソースを競合することがなくなります。
Ray タスクで使用する場合は、 transformers
トレーナー MLflow 統合を無効にします
transformers
トレーナー MLflow 統合は、 transformers
ライブラリ内からデフォルトで有効になっています。 Ray トレーニングを使用してtransformers
モデルを微調整すると、資格情報の問題により Ray タスクは失敗します。 ただし、トレーニングに MLflow を直接使用する場合は、この問題は発生しません。 この問題を回避するには、Apache Spark クラスターを起動するときに、Databricks クラスター構成内からDISABLE_MLFLOW_INTEGRATION
環境変数を 'TRUE' に設定します。
Ray リモート関数の pickle 化エラーに対処する
Rayタスクを実行するために、Rayはタスク関数をpickle化します。 pickle 化に失敗した場合は、コードのどの部分が失敗の原因であるかを診断する必要があります。 pickle 化エラーの一般的な原因は、外部参照、クロージャ、およびステートフルオブジェクトへの参照の処理です。 最も簡単に検証してすぐに修正できるエラーの 1 つは、タスク関数宣言内でインポート ステートメントを移動することによって修正できます。
たとえば、 datasets.load_dataset
、Databricks Runtime ドライバー側でパッチが適用され、参照を非ピクル化可能にする、広く使用されている関数です。 これを解決するには、タスク関数を次のように記述するだけです。
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Ray タスクがメモリ不足 (OOM) エラーで予期せず終了した場合は、Ray メモリ モニターを無効にします。
Ray 2.9.3 では、Ray メモリ モニターに、Ray タスクが理由もなく誤って停止される可能性がある既知の問題がいくつかあります。 この問題を解決するには、Apache Spark クラスターの起動時に、Databricks クラスター構成内で環境変数RAY_memory_monitor_refresh_ms
を0
に設定して、Ray メモリ モニターを無効にすることができます。
データのバッチに変換関数を適用する
データをバッチ処理する場合は、 map_batches
関数を使用して Ray Data API を使用することをお勧めします。 このアプローチは、特にバッチ処理のメリットを享受できる大規模なデータセットや複雑な計算の場合、より効率的でスケーラブルになります。 任意のSpark DataFrame 、ray.data.from_spark
APIを使用して Ray データセットに変換できます。 この変換 API を呼び出して処理された出力は、API ray.data.write_databricks_table
を使用して Databricks UC テーブルに書き出すことができます。
Ray チューナー、Ray トレイン、またはカスタマイズされた Ray タスクでの MLflow の使用
Databricks MLflow と Ray を統合するには、Ray 2.41 以降が必要です。
MLflow を Ray Tune、Ray Train、またはカスタマイズされた Ray タスクと共に使用するには、環境変数 (DATABRICKS_HOST
と DATABRICKS_TOKEN
) を設定するか、ray.util.spark.setup_ray_cluster
を呼び出す前に環境変数 DATABRICKS_HOST
、DATABRICKS_CLIENT_ID
、DATABRICKS_CLIENT_SECRET
を設定します。次のコードは、これらの変数を設定する方法を示しています。
import os
from ray.util.spark import setup_ray_cluster
os.environ["DATABRICKS_HOST"] = "https://....databricks.com"
os.environ["DATABRICKS_TOKEN"] = "<your PAT token"
setup_ray_cluster(num_cpus_worker_node=2, num_gpus_worker_node=0, max_worker_nodes=1, min_worker_nodes=1)
Ray タスクでノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用する
ノートブックスコープの Python ライブラリまたはクラスタリング Python ライブラリをリモート Ray タスクで使用するには、Ray 2.12 以降が必要です。
Ray バージョン 2.11 以下には、Ray タスクがノートブック スコープの Python ライブラリまたはクラスタリング Python ライブラリを使用できないという既知の問題があります。 Ray バージョン 2.11 以下では、Ray クラスタリングを開始する前に、 %pip
magic コマンドを使用して、アクティブなセッション内に追加の依存関係を事前にインストールする必要があります。