Databricks での Scale Ray クラスター
最適なパフォーマンスを得るために Ray クラスターのサイズを調整する方法 (オートスケール、ヘッド ノード構成、異種クラスター、リソース割り当てなど) について説明します。
オートスケールモードでの Ray クラスターの作成
Ray 2.8.0以降では、Databricksオートスケールとの統合をサポートするDatabricksでRayクラスターが開始されました。このオートスケールの統合によりDatabricksDatabricks環境内で内部的にクラスター オートスケールがトリガーされます。
オートスケールを有効にするには、次のコマンドを実行します。
Ray バージョン 2.10 未満の場合:
from ray.util.spark import setup_ray_cluster
setup_ray_cluster(
num_worker_nodes=8,
autoscale=True,
)
Rayバージョン2.10以降の場合:
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
setup_ray_cluster(
min_worker_nodes=2,
max_worker_nodes=4,
num_cpus_per_node=4,
collect_log_to_path="/dbfs/path/to/ray_collected_logs"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
ray.util.spark.setup_ray_cluster
API は Ray クラスターを Apache Sparkに作成します。 内部的には、バックグラウンドの Apache Spark ジョブが作成されます。 ジョブ内の各 Apache Spark タスクは Ray ワーカー ノードを作成し、Ray ヘッド ノードはドライバー上に作成されます。 引数 min_worker_nodes
と max_worker_nodes
は、Ray ワークロード用に作成して使用する Ray ワーカー ノードの範囲を表します。 引数 min_worker_nodes
を未定義のままにすると、固定サイズの Ray クラスターが、使用可能なワーカーの数 max_worker_nodes
で開始されます。 各 Ray ワーカーノードに割り当てる CPU または GPU コアの数を指定するには、引数 num_cpus_worker_node
(デフォルト value: 1) または num_gpus_worker_node
(デフォルト value: 0) を設定します。
Ray バージョン 2.10 未満で、オートスケールが有効な場合、 num_worker_nodes
は Ray ワーカーノードの最大数を示します。 Ray ワーカー ノードのデフォルトの最小数は 0 です。 このデフォルトの設定は、Ray クラスターがアイドル状態になると、Ray ワーカー ノードがゼロにスケールダウンすることを意味します。 これは、すべてのシナリオで高速な応答性を実現するのに理想的ではないかもしれませんが、有効にするとコストを大幅に削減できます。
オートスケールモードでは、ワーカーを ray.util.spark.MAX_NUM_WORKER_NODES
に設定することはできません。
次の引数は、アップスケーリングとダウンスケーリングの速度を構成します。
autoscale_upscaling_speed
保留にできるノードの数を、現在のノード数の倍数で表します。 値が大きいほど、アップスケーリングはより積極的になります。 たとえば、これを 1.0 に設定すると、クラスターのサイズはいつでも最大 100% 増加する可能性があります。autoscale_idle_timeout_minutes
オートスケーラーがアイドル状態のワーカー ノードを削除するまでに経過する必要がある分数を表します。 値が小さいほど、ダウンスケーリングはより積極的になります。
Ray 2.9.0 以降では、 autoscale_min_worker_nodes
を設定して、Ray クラスターがアイドル状態のときに Ray クラスターがゼロ ワーカーにスケールダウンし、クラスターが終了するのを防ぐこともできます。
Ray ヘッドノードが使用するリソースを構成する
デフォルトでは、Ray on Spark 構成の場合、Databricks は Ray ヘッドノードに割り当てられるリソースを次のように制限します。
- 0 CPUコア
- 0 GPUの
- 128 MB のヒープメモリ
- 128 MBのオブジェクトストアメモリ
これは、Ray ヘッド ノードは通常、グローバル調整にのみ使用され、Ray タスクの実行には使用されないためです。 Apache Spark ドライバー ノード リソースは複数のユーザーと共有されるため、デフォルト設定では Apache Spark ドライバー側のリソースが節約されます。Ray 2.8.0 以降では、Ray ヘッド ノードが使用するリソースを構成できます。 setup_ray_cluster API で次の引数を使用します。
num_cpus_head_node
:Rayヘッドノードが使用するCPUコアの設定num_gpus_head_node
:Rayヘッドノードが使用するGPUの設定object_store_memory_head_node
: Ray ヘッドノードによるオブジェクトストアのメモリサイズの設定
異種クラスターのサポート
Ray on Spark クラスターを作成して、より効率的で費用対効果の高いトレーニング 実行を実現し、Ray ヘッド ノードと Ray ワーカー ノード間で異なる構成を設定できます。 ただし、すべての Ray ワーカー ノードは同じ構成である必要があります。 Databricks クラスターは異種クラスターを完全にはサポートしていませんが、クラスターポリシーを設定することで、異なるドライバーインスタンスタイプとワーカーインスタンスタイプで Databricks クラスターを作成できます。例えば:
{
"node_type_id": {
"type": "fixed",
"value": "i3.xlarge"
},
"driver_node_type_id": {
"type": "fixed",
"value": "g4dn.xlarge"
},
"spark_version": {
"type": "fixed",
"value": "13.x-snapshot-gpu-ml-scala2.12"
}
}
Ray クラスター設定の調整
各 Ray ワーカー ノードに推奨される構成は次のとおりです。 Ray ワーカー ノードあたり最小 4 つの CPU コア。各 Ray ワーカー ノードに最小 10 GB のヒープ メモリ。
そのため、 ray.util.spark.setup_ray_cluster
を呼び出すときは、Databricks では num_cpus_per_node
を 4 以上の値に設定することをお勧めします。
各 Ray ワーカー ノードのヒープ メモリのチューニングの詳細については、次のセクションを参照してください。
Ray ワーカー ノードのメモリ割り当て
各 Ray ワーカー ノードは、ヒープ メモリとオブジェクト ストア メモリの 2 種類のメモリを使用します。
各タイプに割り当てられたメモリ サイズは、以下で説明するように決定されます。
各 Ray ワーカー ノードに割り当てられるメモリの合計は次のとおりです。
RAY_WORKER_NODE_TOTAL_MEMORY = (SPARK_WORKER_NODE_PHYSICAL_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES
は、Apache Spark ワーカーノードで起動できる Ray ワーカーノードの最大数です。 これは、引数 num_cpus_per_node
または num_gpus_per_node
によって決まります。
引数 object_store_memory_per_node
を設定しない場合、各 Ray ワーカー ノードに割り当てられるヒープ メモリ サイズとオブジェクト ストア メモリ サイズは次のようになります。
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY * 0.7
OBJECT_STORE_MEMORY_PER_NODE = RAY_WORKER_NODE_TOTAL_MEMORY * 0.3
引数を次のように設定するとobject_store_memory_per_node
RAY_WORKER_NODE_HEAP_MEMORY = RAY_WORKER_NODE_TOTAL_MEMORY - argument_object_store_memory_per_node
さらに、Ray ワーカー ノードあたりのオブジェクト ストア メモリ サイズは、オペレーティング システムの共有メモリによって制限されます。 最大値は次のとおりです。
OBJECT_STORE_MEMORY_PER_NODE_CAP = (SPARK_WORKER_NODE_OS_SHARED_MEMORY / MAX_NUMBER_OF_LOCAL_RAY_WORKER_NODES * 0.8)
SPARK_WORKER_NODE_OS_SHARED_MEMORY
は、Apache Spark ワーカー ノードに設定された /dev/shm
ディスク サイズです。
スケーリングのベストプラクティス
各 Ray ワーカー ノードの CPU と GPU の番号を設定します
引数 num_cpus_worker_node
を Apache Spark ワーカー ノードあたりの CPU コア数に設定することをお勧めします。 同様に、Apache Spark ワーカー ノードあたりの GPU の数に num_gpus_worker_node
を設定するのが最適です。 この構成では、各 Apache Spark ワーカー ノードは、各 Apache Spark ワーカー ノードのリソースを最大限に活用する 1 つの Ray ワーカー ノードを起動します。
クラスターを開始するときは、 クラスター設定内で に 環境変数RAY_memory_monitor_refresh_ms
0
DatabricksApache Sparkを設定します。
Apache Spark と Ray のハイブリッドワークロードのメモリリソース設定
クラスターでハイブリッド ワークロードと Ray ワークロードを実行する場合、SparkDatabricksDatabricks では、Spark エグゼキューターのメモリを小さな値に減らすことをお勧めします。たとえば、 Databricks クラスター構成で spark.executor.memory 4g
を設定します。
Apache SparkエグゼキューターはGCを遅延トリガーするJava処理で、Apache Sparkデータセットキャッシュはエグゼキューターのメモリを大量に消費Apache Spark。これにより、Ray が使用できるメモリが減少します。 メモリ不足エラーの可能性を回避するには、 spark.executor.memory
構成を減らします。
Apache Spark と Ray のハイブリッド ワークロードの計算リソースの構成
SparkDatabricksクラスターでハイブリッド ワークロードと Ray ワークロードを実行する場合は、クラスターノードまたは Ray ワーカーノードを自動スケーラブルにすることをお勧めします。例えば:
Databricks クラスターを開始するために使用できるワーカー ノードの数が固定されている場合は、Ray-on-Spark オートスケールを有効にすることをお勧めします。実行中の Ray ワークロードがない場合、Ray クラスターはスケールダウンされ、リソースを解放して Apache Spark タスクで使用できるようになります。 タスクが終了し Apache Spark Ray が再び使用されると、Ray-on-Spark クラスターは需要を満たすために再びスケールアップされます。
さらに、 Databricks と Ray-on-spark クラスターを自動スケーラブルにすることができます。 たとえば、 Databricks クラスターの自動スケーラブルノードを最大 10 ノードに設定し、Ray-on-Spark ワーカーノードを最大 4 ノードに設定し、各 Ray ワーカーノードが各 Apache Spark ワーカーのリソースを最大限に活用するように設定した場合、Ray ワークロードは、このようなクラスター構成で最大 4 つのノードリソースを使用できます。 これに対し、Apache Spark ジョブは、最大 6 ノード相当のリソースを割り当てることができます。