Databricks でレイを使用する
Ray 2.3.0 以降では、Ray クラスターを作成し、Databricks を使用して Apache Spark クラスター上で Ray アプリケーションを実行できます。 チュートリアルや例など、Ray で機械学習を始める方法については、 Ray のドキュメントを参照してください。 Ray と Apache Spark の統合の詳細については、 「Ray on Spark API ドキュメント」を参照してください。
要件
Databricks Runtime 12.2 LTS ML 以上。
Databricks Runtime クラスターのアクセス モードは、「割り当て済み」モードまたは「分離共有なし」モードのいずれかである必要があります。
レイ のインストール
次のコマンドを使用して、Rayをインストールします。 [default]
拡張機能は、Ray ダッシュボード コンポーネントで必要です。
%pip install ray[default]>=2.3.0
Databricks クラスターにユーザー固有の Ray クラスターを作成する
レイ クラスターを作成するには、 ray.util.spark.setup_ray_cluster APIです。
Databricks クラスターに接続されている Databricks ノートブックでは、次のコマンドを実行できます。
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
num_worker_nodes=2,
num_cpus_worker_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
ray.util.spark.setup_ray_cluster
API は Spark 上に Ray クラスターを作成します。 内部的には、バックグラウンドの Spark ジョブが作成されます。 ジョブ内の各 Spark タスクは Ray ワーカー ノードを作成し、Ray ヘッド ノードはドライバー上に作成されます。 引数num_worker_nodes
は、作成する Ray ワーカー ノードの数を表します。 各 Ray ワーカー ノードに割り当てられる CPU または GPU コアの数を指定するには、引数num_cpus_worker_node
(デフォルト値: 1) またはnum_gpus_worker_node
(デフォルト値: 0) を設定します。
Ray クラスターが作成された後は、任意の Ray アプリケーション コードをノートブックで直接実行できます。 [新しいタブで Ray クラスター ダッシュボードを開く]をクリックして、クラスターの Ray ダッシュボードを表示します。
ヒント
Databricks のシングル ユーザー クラスターを使用している場合は、 num_worker_nodes
をray.util.spark.MAX_NUM_WORKER_NODES
に設定して、Ray クラスターで使用可能なすべてのリソースを使用できます。
setup_ray_cluster(
# ...
num_worker_nodes=ray.util.spark.MAX_NUM_WORKER_NODES,
)
引数collect_log_to_path
を設定して、Ray クラスター ログを収集する宛先パスを指定します。 ログ収集は、Ray クラスターがシャットダウンされた後に実行されます。 Databricks では、Spark クラスターを終了した場合でもログが保持されるように、 /dbfs/
で始まるパスを設定することをお勧めします。 そうしないと、クラスターがシャットダウンされたときにクラスター上のローカル ストレージが削除されるため、ログを回復できなくなります。
注
「作成された Ray クラスターを Ray アプリケーションで自動的に使用するには、 ray.util.spark.setup_ray_cluster
を呼び出して、 RAY_ADDRESS
環境変数を Ray クラスターのアドレスに設定します。」 ray.init API のaddress
引数を使用して、代替クラスター アドレスを指定できます。
レイ アプリケーション を実行する
Ray クラスターが作成されたら、Databricks ノートブックで任意の Ray アプリケーション コードを実行できます。
重要
Databricks では、アプリケーションに必要なライブラリを %pip install <your-library-dependency>
を使用してインストールし、それに応じて Ray クラスターとアプリケーションで使用できるようにすることをお勧めします。 Ray init 関数呼び出しで依存関係を指定すると、Spark ワーカー ノードにアクセスできない場所に依存関係がインストールされるため、バージョンの非互換性とインポート エラーが発生します。
たとえば、次のように Databricks ノートブックで単純な Ray アプリケーションを実行できます。
import ray
import random
import time
from fractions import Fraction
ray.init()
@ray.remote
def pi4_sample(sample_count):
"""pi4_sample runs sample_count experiments, and returns the
fraction of time it was inside the circle.
"""
in_count = 0
for i in range(sample_count):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
in_count += 1
return Fraction(in_count, sample_count)
SAMPLE_COUNT = 1000 * 1000
start = time.time()
future = pi4_sample.remote(sample_count=SAMPLE_COUNT)
pi4 = ray.get(future)
end = time.time()
dur = end - start
print(f'Running {SAMPLE_COUNT} tests took {dur} seconds')
pi = pi4 * 4
print(float(pi))
オートスケール モードで Ray クラスターを作成する
Ray 2.8.0 以降では、Ray クラスターはDatabricksで開始され、 Databricksオートスケールとの統合をサポートしています。 Databricksクラスター オートスケールを参照してください。
Ray 2.8.0 以降では、ワークロードに応じたスケールアップまたはスケールダウンをサポートする Ray クラスターを Databricks クラスター上に作成できます。 このオートスケール統合により、Databricks 環境内で内部的に Databricks クラスターのオートスケールがトリガーされます。
オートスケールを有効にするには、次のコマンドを実行します。
from ray.util.spark import setup_ray_cluster
setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
... # other arguments
)
オートスケールが有効な場合、 num_worker_nodes
Ray ワーカー ノードの最大数を示します。 Ray ワーカー ノードのデフォルトの最小数は 0 です。このデフォルト設定は、Ray クラスターがアイドル状態のときに、Ray ワーカー ノードが 0 個にスケールダウンされることを意味します。 これは、すべてのシナリオで高速応答性を実現するのに理想的ではない可能性がありますが、有効にすると、コストを大幅に削減できます。
オートスケールモードでは、 num_worker_nodes
をray.util.spark.MAX_NUM_WORKER_NODES
に設定できません。
次の引数は、アップスケーリングとダウンスケーリングの速度を設定します。
autoscale_upscaling_speed
保留中にできるノードの数を、現在のノード数の倍数で表します。 値が大きいほど、アップスケーリングがアグレッシブになります。 たとえば、これが 1.0 に設定されている場合、クラスターのサイズはいつでも最大 100% 増加する可能性があります。autoscale_idle_timeout_minutes
アイドル状態のワーカー ノードがオートスケーラーによって削除されるまでに経過する必要がある分数を表します。 値が小さいほど、ダウンスケーリングがアグレッシブになります。
Ray 2.9.0 以降では、 autoscale_min_worker_nodes
を設定して、Ray クラスターがアイドル状態のときに、Ray クラスターがワーカー数ゼロにスケールダウンしないようにすることもできます。
Ray クライアントを使用してリモート Ray クラスターに接続する
Ray 2.9.3 では、 setup_ray_cluster
API を呼び出して Ray クラスターを作成します。 同じノートブックで、 ray.init()
API を呼び出してこの Ray クラスターに接続します。
グローバル モードではない Ray クラスターの場合は、次のコードを使用してリモート接続文字列を取得します。
次のコマンドを使用してリモート接続文字列を取得します。
from ray.util.spark import setup_ray_cluster
_, remote_conn_str = setup_ray_cluster(num_worker_nodes=2, ...)
次のリモート接続文字列を使用してリモート クラスターに接続します。
import ray
ray.init(remote_conn_str)
Ray クライアントは、 ray.data
モジュールで定義された Ray データセット API をサポートしていません。 回避策として、次のコードに示すように、Ray データセット API を呼び出すコードをリモート Ray タスク内にラップできます。
import ray
import pandas as pd
ray.init("ray://<ray_head_node_ip>:10001")
@ray.remote
def ray_data_task():
p1 = pd.DataFrame({'a': [3,4] * 10000, 'b': [5,6] * 10000})
ds = ray.data.from_pandas(p1)
return ds.repartition(4).to_pandas()
ray.get(ray_data_task.remote())
Spark DataFrame からデータを読み込む
SparkDataFrameRay データセットとして読み込むには、まず、SparkDataFrame 形式で UC ボリュームまたはDatabricks Filesystem (非推奨)Parquet に保存する必要があります。Databricks Filesystemへのアクセスを安全に制御するために、 Databricksではクラウド オブジェクト ストレージをDBFSにマウントすることをお勧めします。 次に、次のヘルパー メソッドを使用して、保存された Spark DataFrame パスからray.data.Dataset
インスタンスを作成できます。
import ray
import os
from urllib.parse import urlparse
def create_ray_dataset_from_spark_dataframe(spark_dataframe, dbfs_tmp_path):
spark_dataframe.write.mode('overwrite').parquet(dbfs_tmp_path)
fuse_path = "/dbfs" + urlparse(dbfs_tmp_path).path
return ray.data.read_parquet(fuse_path)
# For example, read a Delta Table as a Spark DataFrame
spark_df = spark.read.table("diviner_demo.diviner_pedestrians_data_500")
# Provide a dbfs location to write the table to
data_location_2 = (
"dbfs:/home/example.user@databricks.com/data/ray_test/test_data_2"
)
# Convert the Spark DataFrame to a Ray dataset
ray_dataset = create_ray_dataset_from_spark_dataframe(
spark_dataframe=spark_df,
dbfs_tmp_path=data_location_2
)
Databricks SQL ウェアハウスを介してUnity Catalogテーブルからデータをロードする
Ray 2.8.0 以降の場合、 ray.data.read_databricks_tables
API を呼び出して Databricks Unity Catalog テーブルからデータをロードできます。
まず、DATABRICKS_TOKEN
の ユーザー名 をDatabricksトークンに設定する必要があります。 プログラムを Databricks Runtime で実行していない場合は、次に示すように、 DATABRICKS_HOST
環境変数を Databricks ワークスペース URL に設定します。
export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net
次に、 ray.data.read_databricks_tables()
を呼び出して Databricks SQL ウェアハウスから読み取ります。
import ray
ray_dataset = ray.data.read_databricks_tables(
warehouse_id='...', # Databricks SQL warehouse ID
catalog='catalog_1', # Unity catalog name
schema='db_1', # Schema name
query="SELECT title, score FROM movie WHERE year >= 1980",
)
Ray ヘッド ノードによって使用されるリソースを構成する
デフォルトでは、Ray on Spark 構成の場合、Databricks は Ray ヘッド ノードに割り当てられるリソースを次のように制限します。
0 CPU コア
0 GPU
128 MB のヒープ メモリ
128 MB のオブジェクト ストア メモリ
これは、Ray ヘッド ノードが通常、Ray タスクの実行ではなく、グローバル調整のために使用されるためです。 Spark ドライバー ノードのリソースは複数のユーザーと共有されるため、デフォルト設定では Spark ドライバー側のリソースが節約されます。
Ray 2.8.0 以降では、Ray ヘッド ノードによって使用されるリソースを構成できます。 setup_ray_cluster
API で次の引数を使用します。
num_cpus_head_node
: Rayヘッドノードで使用するCPUコアの設定num_gpus_head_node
:レイヘッドノードが使用するGPUの設定object_store_memory_head_node
: Ray headノードによるオブジェクトストアのメモリサイズ設定
異種クラスターのサポート
トレーニングをより効率的かつコスト効率よく実行するには、Ray on Spark クラスターを作成し、Ray ヘッド ノードと Ray ワーカー ノード間で異なる構成を設定できます。 ただし、すべての Ray ワーカー ノードは同じ構成である必要があります。 Databricks クラスターは異種クラスターを完全にはサポートしませんが、クラスター ポリシーを設定することで、異なるドライバー インスタンス タイプとワーカー インスタンス タイプを使用して Databricks クラスターを作成できます。
例:
{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}
レイ クラスター構成 の調整
各 Ray ワーカー ノードの推奨構成は次のとおりです。
レイ ワーカー ノードあたり最低 4 つの CPU コア。
各 Ray ワーカー ノードの最小 10 GB ヒープ メモリ。
ray.util.spark.setup_ray_cluster
を呼び出す場合、Databricks ではnum_cpus_worker_node
を値 >= 4
に設定することをお勧めします。
各 Ray ワーカーノードのヒープメモリの調整の詳細については、「 Ray ワーカーノードのメモリ割り当て 」を参照してください。
レイワーカーノードの メモリ割り当て
各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。 各タイプに割り当てられたメモリサイズは、以下のように決定されます。
各 Ray ワーカー ノードに割り当てられる合計メモリは次のとおりです。
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
は、Spark ワーカー ノードで起動できる Ray ワーカー ノードの最大数です。 これは、引数 num_cpus_worker_node
または num_gpus_worker_node
によって決定されます。
引数 object_store_memory_per_node
を設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは次のようになります。
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
引数を設定すると、次の object_store_memory_per_node
になります。
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
さらに、Ray ワーカー ノードあたりのオブジェクト ストア メモリ サイズは、オペレーティング システムの共有メモリによって制限されます。 最大値は次のとおりです。
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
は、Spark ワーカー ノード用に構成された /dev/shm
ディスク サイズです。
おすすめの方法
各 Ray ワーカー ノードの CPU / GPU 数を設定するにはどうすればよいでしょうか?
Databricks では、 num_cpus_worker_node
Spark ワーカー ノードあたりの CPU コアの数に設定し、 num_gpus_worker_node
Spark ワーカー ノードあたりの GPU の数に設定することをお勧めします。 この構成では、各 Spark ワーカー ノードは、Spark ワーカー ノードのリソースを完全に活用する 1 つの Ray ワーカー ノードを起動します。
GPU クラスター構成
Ray クラスターは、Databricks Spark クラスター上で実行されます。 一般的なシナリオとしては、Spark ジョブと Spark UDF を使用して、GPU リソースを必要としない単純なデータ前処理タスクを実行し、Ray を使用して GPU のメリットを享受できる複雑な機械学習タスクを実行することが挙げられます。 この場合、Databricks では、すべての Spark DataFrame 変換と Spark UDF 実行で GPU リソースが使用されないように、Spark クラスター レベルの構成パラメーターspark.task.resource.gpu.amount
を0
に設定することをお勧めします。
この構成の利点は次のとおりです。
通常、GPU インスタンス タイプには GPU デバイスよりも多くの CPU コアが搭載されているため、Spark ジョブの並列処理が増加します。
Spark クラスターが複数のユーザーと共有されている場合、この構成により、Spark ジョブが同時に実行されている Ray ワークロードと GPU リソースをめぐって競合することがなくなります。
Ray タスクで使用する場合は、 transformers
トレーナー mlflow 統合を無効にします
transformers
トレーナー MLflow 統合はデフォルトでオンになっています。 Ray トレーニングする を使用してトレーニングすると、Ray タスクに対してDatabricks MLflowサービス 資格情報が構成されていないため、Ray タスクは失敗します。
この問題を回避するには、Databricks クラスター構成でDISABLE_MLFLOW_INTEGRATION
環境変数を 'TRUE' に設定します。 Ray トレーナー タスクでMLflowにログインする方法の詳細については、「Ray タスクでMLflowを使用する」セクションを参照してください。
Ray リモート関数の pickle 化エラーに対処
Rayタスクを実行するために、Rayはpickleを使用してタスク関数をシリアル化します。 pickle 化に失敗した場合は、コード内でエラーが発生した行を特定します。 多くの場合、 import
コマンドをタスク関数に移動すると、一般的なピクル化エラーに対処できます。 たとえば、 datasets.load_dataset
広く使用されている関数ですが、Databricks Runtime 内でパッチが適用されているため、外部インポートが pickle 化できなくなる可能性があります。 この問題を修正するには、次のようにコードを更新します。
def ray_task_func():
from datasets import load_dataset # import the function inside task function
...
Ray タスクが OOM エラーで予期せず終了した場合は Ray メモリ モニターを無効にします。
Ray 2.9.3 では、Ray メモリ モニターに、Ray タスクが誤って終了される原因となる既知の問題があります。
この問題を解決するには、Databricks クラスター構成で環境変数RAY_memory_monitor_refresh_ms
を0
に設定して、Ray メモリ モニターを無効にします。
Spark と Ray のハイブリッド ワークロードのメモリ リソース構成
クラスターでハイブリッド および Ray ワークロードを実行する場合、SparkDatabricksDatabricks Sparkspark.executor.memory 4g
、Databricks クラスター構成で を設定するなど、 エグゼキューターのメモリを小さな値に減らすことを推奨します。Sparkこれは、ガベージ コレクションJava (GC) を遅延トリガーする プロセス内で エグゼキューターが実行されているためです。Spark データセットのキャッシュに対するメモリ負荷はかなり高く、Ray が使用できるメモリが減少します。 潜在的なOOMエラーを回避するために、 Databricks構成された「spark.エグゼキューター.memory」を減らすことを推奨しています。 値をデフォルトより小さい値に設定します。
Spark と Ray のハイブリッド ワークロードの計算リソース構成
Databricks クラスターでハイブリッド Spark および Ray ワークロードを実行する場合は、Spark クラスター ノードを自動スケーリング可能に設定するか、Ray ワーカー ノードを自動スケーリング可能に設定するか、または両方で自動スケーリングを有効にします。
たとえば、 Databricksクラスター内のワーカー ノードの数が固定されている場合は、Ray ワークロードが実行されていないときに Ray クラスターがスケールダウンするように、Ray-on- Sparkオートスケールを有効にすることを検討してください。 その結果、アイドル状態のクラスター リソースが解放され、Spark ジョブが使用できるようになります。
Spark ジョブが完了し、Ray ジョブが開始すると、処理要求を満たすために Ray-on-Spark クラスターがスケールアップされます。
Databricks クラスターと Ray-on-spark クラスターの両方を自動スケーラブルにすることもできます。 具体的には、Databricks クラスターの自動スケーラブル ノードを最大 10 ノードに、Ray-on-Spark ワーカー ノードを最大 4 ノード (Spark ワーカーごとに 1 つの Ray ワーカー ノード) に構成して、Spark が Spark タスクに最大 6 ノードを自由に割り当てることができるようにします。 つまり、Ray ワークロードは最大 4 つのノード リソースを同時に使用できますが、Spark ジョブは最大 6 つのノード相当のリソースを割り当てることができます。
データのバッチに変換関数を適用する
データをバッチで処理する場合、Databricks では、 map_batches
関数を備えた Ray Data API を使用することをお勧めします。 このアプローチは、特に大規模なデータセットの場合や、バッチ処理のメリットを享受できる複雑な計算を実行する場合に、より効率的でスケーラブルになります。 任意の Spark DataFrame は、 ray.data.from_spark
API を使用して Ray データに変換でき、API ray.data.write_databricks_table
を使用して Databricks UC テーブルに書き出すことができます。
Ray でMLflowを使用するタスク
Ray タスクで MLflow を使用するには、以下を構成します。
Ray タスクにおける Databricks MLflow 資格情報
MLflow は Spark ドライバー側で実行され、生成された
run_id
値を Ray タスクに渡します。
次のコードは一例です。
import mlflow
import ray
from mlflow.utils.databricks_utils import get_databricks_env_vars
mlflow_db_creds = get_databricks_env_vars("databricks")
experiment_name = "/Users/<your-name>@databricks.com/mlflow_test"
mlflow.set_experiment(experiment_name)
@ray.remote
def ray_task(x, run_id):
import os
os.environ.update(mlflow_db_creds)
mlflow.set_experiment(experiment_name)
# We need to use the run created in Spark driver side,
# and set `nested=True` to make it a nested run inside the
# parent run.
with mlflow.start_run(run_id=run_id, nested=True):
mlflow.log_metric(f"task_{x}_metric", x)
return x
with mlflow.start_run() as run: # create MLflow run in Spark driver side.
results = ray.get([ray_task.remote(x, run.info.run_id) for x in range(10)])
Ray タスクでノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用する
現在、Ray には、Ray タスクがノートブック スコープの Python ライブラリまたはクラスター Python ライブラリを使用できないという既知の問題があります。 この制限に対処するには、Ray-on-Spark クラスターを起動する前に、ノートブックで次のコマンドを実行します。
%pip install ray==<The Ray version you want to use> --force-reinstall
次に、ノートブックで次のコマンドを実行して、Python カーネルを再起動します。
dbutils.library.restartPython()
スタック トレースとフレーム グラフを Ray ダッシュボード アクタ ページで 有効にする
[Ray Dashboard Actors (レイ ダッシュボード アクタ )] ページでは、アクティブな Ray アクタのスタック トレースとフレーム グラフを表示できます。
この情報を表示するには、Ray クラスターを起動する前にpy-spy
をインストールしてください。
%pip install py-spy
Ray クラスター をシャットダウンする
Databricksで実行されているRayクラスターをシャットダウンするには、 ray.utils.spark.shutdown_ray_clusterを呼び出します。 API。
注
Ray クラスターは、次の場合にもシャットダウンします。
対話型ノートブックを Databricks クラスターからデタッチします。
Databricks ジョブ が完了します。
Databricks クラスターが再起動または終了します。
指定されたアイドル時間のアクティビティはありません。
制限
マルチユーザー共有 Databricks クラスター (分離モードが有効) はサポートされていません。
%pip を使用してパッケージをインストールすると、レイ クラスターがシャットダウンします。 %pipを使用してすべてのライブラリのインストールが完了したら、必ずRayを起動してください。
ray.util.spark.setup_ray_cluster
から設定を上書きする統合を使用すると、Ray クラスターが不安定になり、Ray コンテキストがクラッシュする可能性があります。たとえば、xgboost_ray
パッケージを使用し、アクターまたはcpus_per_actor
構成でRayParams
をレイ クラスター構成を超えて設定すると、レイ クラスターがサイレントにクラッシュする可能性があります。