メインコンテンツまでスキップ

Databricks での Scale Ray クラスター

最適なパフォーマンスを得るために Ray クラスターのサイズを調整する方法 (オートスケール、ヘッド ノード構成、異種クラスター、リソース割り当てなど) について説明します。

オートスケールモードでの Ray クラスターの作成

Ray 2.8.0以降では、Databricksオートスケールとの統合をサポートするDatabricksでRayクラスターが開始されました。このオートスケールの統合によりDatabricksDatabricks環境内で内部的にクラスター オートスケールがトリガーされます。

オートスケールを有効にするには、次のコマンドを実行します。

Ray バージョン 2.10 未満の場合:

Python
from ray.util.spark import setup_ray_cluster

setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
)

Rayバージョン2.10以降の場合:

Python
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster

setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)

# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)

ray.util.spark.setup_ray_cluster API は Ray クラスターを Apache Sparkに作成します。 内部的には、バックグラウンドの Apache Spark ジョブが作成されます。 ジョブ内の各 Apache Spark タスクは Ray ワーカー ノードを作成し、Ray ヘッド ノードはドライバー上に作成されます。 引数 min_worker_nodesmax_worker_nodes は、Ray ワークロード用に作成して使用する Ray ワーカー ノードの範囲を表します。 引数 min_worker_nodes を未定義のままにすると、固定サイズの Ray クラスターが、使用可能なワーカーの数 max_worker_nodes で開始されます。 各 Ray ワーカーノードに割り当てる CPU または GPU コアの数を指定するには、引数 num_cpus_worker_node (デフォルト value: 1) または num_gpus_worker_node (デフォルト value: 0) を設定します。

Ray バージョン 2.10 未満で、オートスケールが有効な場合、 num_worker_nodes は Ray ワーカーノードの最大数を示します。 Ray ワーカー ノードのデフォルトの最小数は 0 です。 このデフォルトの設定は、Ray クラスターがアイドル状態になると、Ray ワーカー ノードがゼロにスケールダウンすることを意味します。 これは、すべてのシナリオで高速な応答性を実現するのに理想的ではないかもしれませんが、有効にするとコストを大幅に削減できます。

オートスケールモードでは、ワーカーを ray.util.spark.MAX_NUM_WORKER_NODESに設定することはできません。

次の引数は、アップスケーリングとダウンスケーリングの速度を構成します。

  • autoscale_upscaling_speed 保留にできるノードの数を、現在のノード数の倍数で表します。 値が大きいほど、アップスケーリングはより積極的になります。 たとえば、これを 1.0 に設定すると、クラスターのサイズはいつでも最大 100% 増加する可能性があります。
  • autoscale_idle_timeout_minutes オートスケーラーがアイドル状態のワーカー ノードを削除するまでに経過する必要がある分数を表します。 値が小さいほど、ダウンスケーリングはより積極的になります。

Ray 2.9.0 以降では、 autoscale_min_worker_nodes を設定して、Ray クラスターがアイドル状態のときに Ray クラスターがゼロ ワーカーにスケールダウンし、クラスターが終了するのを防ぐこともできます。

Ray ヘッドノードが使用するリソースを構成する

デフォルトでは、Ray on Spark 構成の場合、Databricks は Ray ヘッドノードに割り当てられるリソースを次のように制限します。

  • 0 CPUコア
  • 0 GPUの
  • 128 MB のヒープメモリ
  • 128 MBのオブジェクトストアメモリ

これは、Ray ヘッド ノードは通常、グローバル調整にのみ使用され、Ray タスクの実行には使用されないためです。 Apache Spark ドライバー ノード リソースは複数のユーザーと共有されるため、デフォルト設定では Apache Spark ドライバー側のリソースが節約されます。Ray 2.8.0 以降では、Ray ヘッド ノードが使用するリソースを構成できます。 setup_ray_cluster API で次の引数を使用します。

  • num_cpus_head_node:Rayヘッドノードが使用するCPUコアの設定
  • num_gpus_head_node:Rayヘッドノードが使用するGPUの設定
  • object_store_memory_head_node: Ray ヘッドノードによるオブジェクトストアのメモリサイズの設定

異種クラスターのサポート

Ray on Spark クラスターを作成して、より効率的で費用対効果の高いトレーニング 実行を実現し、Ray ヘッド ノードと Ray ワーカー ノード間で異なる構成を設定できます。 ただし、すべての Ray ワーカー ノードは同じ構成である必要があります。 Databricks クラスターは異種クラスターを完全にはサポートしていませんが、クラスターポリシーを設定することで、異なるドライバーインスタンスタイプとワーカーインスタンスタイプで Databricks クラスターを作成できます。例えば:

{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}

Ray クラスター設定の調整

各 Ray ワーカー ノードに推奨される構成は次のとおりです。 Ray ワーカー ノードあたり最小 4 つの CPU コア。各 Ray ワーカー ノードに最小 10 GB のヒープ メモリ。

そのため、 ray.util.spark.setup_ray_clusterを呼び出すときは、Databricks では num_cpus_per_node を 4 以上の値に設定することをお勧めします。

各 Ray ワーカー ノードのヒープ メモリのチューニングの詳細については、次のセクションを参照してください。

Ray ワーカー ノードのメモリ割り当て

各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。

各タイプに割り当てられたメモリ サイズは、以下で説明するように決定されます。

各 Ray ワーカー ノードに割り当てられるメモリの合計は次のとおりです。 RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES は、Apache Spark ワーカーノードで起動できる Ray ワーカーノードの最大数です。 これは、引数 num_cpus_per_node または num_gpus_per_nodeによって決まります。

引数 object_store_memory_per_nodeを設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは次のようになります。 RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7 OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3

引数を次のように設定するとobject_store_memory_per_node RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node

さらに、Ray ワーカー ノードあたりのオブジェクト ストア メモリ サイズは、オペレーティング システムの共有メモリによって制限されます。 最大値は次のとおりです。 OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)

SPARK_WORKER_NODE_OS_SHARED_MEMORY は、Apache Spark ワーカー ノードに設定された /dev/shm ディスク サイズです。

スケーリングのベストプラクティス

各 Ray ワーカー ノードの CPU と GPU の番号を設定します

引数 num_cpus_worker_node を Apache Spark ワーカー ノードあたりの CPU コア数に設定することをお勧めします。 同様に、Apache Spark ワーカー ノードあたりの GPU の数に num_gpus_worker_node を設定するのが最適です。 この構成では、各 Apache Spark ワーカー ノードは、各 Apache Spark ワーカー ノードのリソースを最大限に活用する 1 つの Ray ワーカー ノードを起動します。

クラスターを開始するときは、 クラスター設定内で に 環境変数RAY_memory_monitor_refresh_ms 0DatabricksApache Sparkを設定します。

Apache Spark と Ray のハイブリッドワークロードのメモリリソース設定

クラスターでハイブリッド ワークロードと Ray ワークロードを実行する場合、SparkDatabricksDatabricks では、Spark エグゼキューターのメモリを小さな値に減らすことをお勧めします。たとえば、 Databricks クラスター構成で spark.executor.memory 4g を設定します。

Apache SparkエグゼキューターはGCを遅延トリガーするJava処理で、Apache Sparkデータセットキャッシュはエグゼキューターのメモリを大量に消費Apache Spark。これにより、Ray が使用できるメモリが減少します。 メモリ不足エラーの可能性を回避するには、 spark.executor.memory 構成を減らします。

Apache Spark と Ray のハイブリッド ワークロードの計算リソースの構成

SparkDatabricksクラスターでハイブリッド ワークロードと Ray ワークロードを実行する場合は、クラスターノードまたは Ray ワーカーノードを自動スケーラブルにすることをお勧めします。例えば:

Databricks クラスターを開始するために使用できるワーカー ノードの数が固定されている場合は、Ray-on-Spark オートスケールを有効にすることをお勧めします。実行中の Ray ワークロードがない場合、Ray クラスターはスケールダウンされ、リソースを解放して Apache Spark タスクで使用できるようになります。 タスクが終了し Apache Spark Ray が再び使用されると、Ray-on-Spark クラスターは需要を満たすために再びスケールアップされます。

さらに、 Databricks と Ray-on-spark クラスターを自動スケーラブルにすることができます。 たとえば、 Databricks クラスターの自動スケーラブルノードを最大 10 ノードに設定し、Ray-on-Spark ワーカーノードを最大 4 ノードに設定し、各 Ray ワーカーノードが各 Apache Spark ワーカーのリソースを最大限に活用するように設定した場合、Ray ワークロードは、このようなクラスター構成で最大 4 つのノードリソースを使用できます。 これに対し、Apache Spark ジョブは、最大 6 ノード相当のリソースを割り当てることができます。