サーバレス GPU コンピュート上の Ray Tune を使用した GPU による分散XGBoost
このノートブックでは、 Databricksサーバーレス GPU コンピュートで Ray Tune を使用したハイパーパラメーター最適化によるエンドツーエンドの分散XGBoostトレーニングを示します。 内容は次のとおりです。
- 合成データセットの生成 : を使用して大規模な合成データセットを作成します。
dbldatagen - 分散トレーニング : 分散データ並列処理のためにXGBoostで Ray トレーニングするを使用する
- ハイパーパラメータ最適化 :Ray TuneとOptunaを活用したハイパーパラメータ自動探索
- MLflow統合 : エクスペリメント、メトリクス、登録するモデルをUnity Catalogに記録する
このデモでは、バイナリ分類に 30M 行 x 100 特徴列 x 1 ターゲットカラム (2 クラス) を使用します。 このデータセットは最大 12 GB に圧縮されており、分散トレーニング体験の優れた基盤を提供します。
サーバレスGPUコンピュートのメリット
- オンデマンド スケーリング : ワークロードの需要に基づいて GPU リソースを自動的にプロビジョニングおよびスケーリングします。
- コストの最適化 : リソースの自動クリーンアップにより、使用したコンピュート時間に対してのみ支払います
- インフラストラクチャ管理なし : 基盤となるハードウェアを管理せずに ML トレーニングに集中できます。
- 高可用性 :組み込み フォールトトレランスと自動フェイルオーバー機能
よくある質問
XGBoost の分散バージョンに切り替えるのはいつですか?
- 大規模な XGBoost データセットでは、分散データ並列処理 (DDP) を使用する必要があります。この例では、デモンストレーションのために 3000 万行を使用します。
- すべての CPU にわたる単一ノードとマルチスレッド、次に CPU を備えた複数のノードにわたる DDP、最後に複数の GPU を活用する DDP を検討します。
GPU を使用する場合、データセットにはどれくらいのメモリ (VRAM) が必要ですか?
- 3000万行×100列×4バイト(float32)=約12GB
- モデルをトレーニングするには、GPU全体でVRAMの合計2~4倍のデータフットプリント(2倍なので約24GB)が必要です。
- この追加メモリはラウンド、モデル サイズ、勾配、中間計算のブースティングに使用されます。
- A10G GPU (各24GB VRAM)はこのワークロードに最適です - モデルごとに1~2個のGPU
サーバレスGPUコンピュート仕様
Databricksサーバレス GPU コンピュート:
- GPU タイプ : NVIDIA A10G (24GB VRAM)、H100 (80GB VRAM)
- 自動スケーリング : ワークロードの需要に基づいて自動的にスケーリングします
- 課金 : 自動リソースクリーンアップによる秒単位課金
- 可用性 : 高可用性を備えたマルチリージョンサポート
- 統合 : Unity Catalog および MLflow とのシームレスな統合
推奨構成:
- ワーカー : 最適なパフォーマンスを実現する 2 ~ 4 人のレイ ワーカー
- GPU 割り当て : ワーカーあたり 1 つの A10G GPU (24GB VRAM)
- メモリ : データ前処理用にワーカーあたり 32GB の RAM
- ストレージ : データアクセスのためのUnity Catalog統合
サーバレス GPU コンピュートに接続し、依存関係をインストールする
ノートブックをサーバレス A10 GPU に接続します。
- 上部の 「接続」 ドロップダウンをクリックします。
- サーバレス GPU を選択します。
- ノートブックの右側にある 環境 サイドパネルを開きます。
- このデモでは、 アクセラレータ を A10 に設定します。
- [適用] を選択し、 [確認] をクリックして、この環境をノートブックに適用します。
%pip install -qU dbldatagen ray[all]==2.48.0 xgboost optuna mlflow>=3.0
# Note: serverless_gpu package should be pre-installed in the Serverless GPU environment
# If not available, install it using: %pip install databricks-serverless-gpu
dbutils.library.restartPython()
ウィジェットを使用してUnity Catalogパスとトレーニングを設定します
Unity Catalogパスとトレーニング構成用のウィジェットを作成します。 以下のコードでは、カスタマイズ可能なプレースホルダー値を使用しています。
# Define job inputs
dbutils.widgets.text("catalog", "main", "Unity Catalog Name")
dbutils.widgets.text("schema", "dev", "Unity Catalog Schema Name")
dbutils.widgets.text("num_training_rows", "30000000", "Number of training rows to generate")
dbutils.widgets.text("num_training_columns", "100", "Number of feature columns")
dbutils.widgets.text("num_labels", "2", "Number of labels in the target column")
dbutils.widgets.text("warehouse_id", "93a682dcf60dae13", "SQL Warehouse ID (optional, for reading from UC)")
dbutils.widgets.text("num_workers", "2", "Number of Ray workers per trial")
dbutils.widgets.text("num_hpo_trials", "8", "Number of hyperparameter optimization trials")
dbutils.widgets.text("max_concurrent_trials", "4", "Maximum concurrent HPO trials")
# Get parameter values (will override widget defaults if run by job)
UC_CATALOG = dbutils.widgets.get("catalog")
UC_SCHEMA = dbutils.widgets.get("schema")
NUM_TRAINING_ROWS = int(dbutils.widgets.get("num_training_rows"))
NUM_TRAINING_COLUMNS = int(dbutils.widgets.get("num_training_columns"))
NUM_LABELS = int(dbutils.widgets.get("num_labels"))
WAREHOUSE_ID = dbutils.widgets.get("warehouse_id")
NUM_WORKERS = int(dbutils.widgets.get("num_workers"))
NUM_HPO_TRIALS = int(dbutils.widgets.get("num_hpo_trials"))
MAX_CONCURRENT_TRIALS = int(dbutils.widgets.get("max_concurrent_trials"))
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"NUM_TRAINING_ROWS: {NUM_TRAINING_ROWS}")
print(f"NUM_TRAINING_COLUMNS: {NUM_TRAINING_COLUMNS}")
print(f"NUM_LABELS: {NUM_LABELS}")
print(f"NUM_WORKERS: {NUM_WORKERS}")
print(f"NUM_HPO_TRIALS: {NUM_HPO_TRIALS}")
ステップ 1: 合成データセットを生成する
デモンストレーションの目的で、 dbldatagenを使用して合成データセットを作成します。本番運用では、実際のトレーニング データを使用します。
import dbldatagen as dg
from pyspark.sql.types import FloatType, IntegerType
import os
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog
# Create schema if it doesn't exist
w = WorkspaceClient()
try:
created_schema = w.schemas.create(name=UC_SCHEMA, catalog_name=UC_CATALOG)
print(f"Schema '{created_schema.name}' created successfully")
except Exception as e:
# Handle the case where the schema already exists
print(f"Schema '{UC_SCHEMA}' already exists in catalog '{UC_CATALOG}'. Skipping schema creation.")
# Create volumes for data storage
parquet_write_path = f'/Volumes/{UC_CATALOG}/{UC_SCHEMA}/synthetic_data'
if not os.path.exists(parquet_write_path):
created_volume = w.volumes.create(
catalog_name=UC_CATALOG,
schema_name=UC_SCHEMA,
name='synthetic_data',
volume_type=catalog.VolumeType.MANAGED
)
print(f"Volume 'synthetic_data' at {parquet_write_path} created successfully")
else:
print(f"Volume {parquet_write_path} already exists. Skipping volumes creation.")
# Create volume for Ray storage
ray_storage_path = f'/Volumes/{UC_CATALOG}/{UC_SCHEMA}/ray_data_tmp_dir'
if not os.path.exists(ray_storage_path):
created_volume = w.volumes.create(
catalog_name=UC_CATALOG,
schema_name=UC_SCHEMA,
name='ray_data_tmp_dir',
volume_type=catalog.VolumeType.MANAGED
)
print(f"Volume 'ray_data_tmp_dir' at {ray_storage_path} created successfully")
else:
print(f"Volume {ray_storage_path} already exists. Skipping volumes creation.")
# Generate synthetic dataset
table_name = f"synthetic_data_{NUM_TRAINING_ROWS}_rows_{NUM_TRAINING_COLUMNS}_columns_{NUM_LABELS}_labels"
label = "target"
print(f"Generating {NUM_TRAINING_ROWS} synthetic rows")
print(f"Generating {NUM_TRAINING_COLUMNS} synthetic columns")
print(f"Generating {NUM_LABELS} synthetic labels")
testDataSpec = (
dg.DataGenerator(spark, name="synthetic_data", rows=NUM_TRAINING_ROWS)
.withIdOutput()
.withColumn(
"r",
FloatType(),
expr="rand()",
numColumns=NUM_TRAINING_COLUMNS,
)
.withColumn(
"target",
IntegerType(),
expr=f"floor(rand()*{NUM_LABELS})",
numColumns=1
)
)
df = testDataSpec.build()
df = df.repartition(50)
# Write to Delta table
df.write.format("delta").mode("overwrite").option("delta.enableDeletionVectors", "true").saveAsTable(
f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}"
)
# Write to Parquet for Ray dataset reading (backup option)
df.write.mode("overwrite").format("parquet").save(
f"{parquet_write_path}/{table_name}"
)
print(f"Dataset created successfully: {UC_CATALOG}.{UC_SCHEMA}.{table_name}")
ステップ 2: Ray データセットとトレーニング関数をセットアップする
Unity Catalogから読み取り、分散トレーニング関数をセットアップするように Ray を構成します。
import ray
import os
# Set up environment variables for MLflow
os.environ['DATABRICKS_HOST'] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
def read_ray_dataset(catalog, schema, table, warehouse_id=None):
"""
Read data from Unity Catalog into a Ray Dataset.
Args:
catalog: Unity Catalog name
schema: Schema name
table: Table name
warehouse_id: Optional SQL Warehouse ID for reading from UC
Returns:
train_dataset, val_dataset: Ray Datasets for training and validation
"""
try:
## Option 1 (PREFERRED): Build a Ray Dataset using a Databricks SQL Warehouse
if warehouse_id and warehouse_id.strip():
ds = ray.data.read_databricks_tables(
warehouse_id=warehouse_id,
catalog=catalog,
schema=schema,
query=f'SELECT * FROM {catalog}.{schema}.{table}',
)
print('Read directly from Unity Catalog using SQL Warehouse')
else:
raise ValueError("Warehouse ID not provided - falling back to Parquet")
except Exception as e:
print(f"Note: {e}")
## Option 2: Build a Ray Dataset using Parquet files
# If you have too many Ray nodes, you may not be able to create a Ray dataset
# using the warehouse method above because of rate limits. One backup solution
# is to create parquet files from the delta table and build a ray dataset from that.
parquet_path = f'/Volumes/{catalog}/{schema}/synthetic_data/{table}'
ds = ray.data.read_parquet(parquet_path)
print('Read directly from Parquet files')
train_dataset, val_dataset = ds.train_test_split(test_size=0.25)
return train_dataset, val_dataset
ステップ 3: XGBoostトレーニング関数を定義する
ワーカーごとのトレーニング機能と、分散トレーニングを調整するドライバー機能を定義します。
import xgboost
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
def train_fn_per_worker(params: dict):
"""
Trains an XGBoost model on a shard of the distributed dataset assigned to this worker.
This function is designed to be executed by individual Ray Train workers.
It retrieves the training and validation data shards, converts them to DMatrix format,
and performs a portion of the distributed XGBoost training. Ray Train handles
the inter-worker communication.
Args:
params (dict): A dictionary of XGBoost training parameters, including
'num_estimators', 'eval_metric', and potentially other
XGBoost-specific parameters.
"""
# Get dataset shards for this worker
train_shard = ray.train.get_dataset_shard("train")
val_shard = ray.train.get_dataset_shard("val")
# Convert shards to pandas DataFrames
train_df = train_shard.materialize().to_pandas()
val_df = val_shard.materialize().to_pandas()
train_X = train_df.drop(label, axis=1)
train_y = train_df[label]
val_X = val_df.drop(label, axis=1)
val_y = val_df[label]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(val_X, label=val_y)
# Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for workers to communicate with each other.
evals_results = {}
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=params['num_estimators'],
evals_result=evals_results,
callbacks=[RayTrainReportCallback(
metrics={params['eval_metric']: f"validation-{params['eval_metric']}"},
frequency=1
)],
)
def train_driver_fn(config: dict, train_dataset, val_dataset, ray_storage_path: str):
"""
Drives the distributed XGBoost training process using Ray Train.
This function sets up the XGBoostTrainer, configures scaling (number of workers, GPU usage,
and resources per worker), and initiates the distributed training by calling `trainer.fit()`.
It also propagates metrics back to Ray Tune if integrated.
Args:
config (dict): A dictionary containing run-level hyperparameters such as
'num_workers', 'use_gpu', and a nested 'params' dictionary
for XGBoost training parameters.
train_dataset: The Ray Dataset for training.
val_dataset: The Ray Dataset for validation.
ray_storage_path: Path for Ray storage.
Returns:
None: The function reports metrics to Ray Tune but does not explicitly return a value.
The trained model artifact is typically handled by Ray Train's checkpointing.
"""
# Unpack run-level hyperparameters.
num_workers = config["num_workers"]
use_gpu = config["use_gpu"]
params = config['params']
# Initialize the XGBoostTrainer, which orchestrates the distributed training using Ray.
trainer = XGBoostTrainer(
train_loop_per_worker=train_fn_per_worker, # The function to be executed on each worker
train_loop_config=params,
# By default Ray uses 1 GPU and 1 CPU per worker if resources_per_worker is not specified.
# XGBoost is multi-threaded, so multiple CPUs can be assigned per worker, but not GPUs.
scaling_config=ray.train.ScalingConfig(
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker={"CPU": 12, "GPU": 1}
),
datasets={"train": train_dataset, "val": val_dataset}, # Ray Datasets to be used by the trainer + workers
run_config=ray.train.RunConfig(storage_path=ray_storage_path)
)
result = trainer.fit()
# Propagate metrics back up for Ray Tune.
# Ensure the metric key matches your eval_metric.
ray.tune.report(
{params['eval_metric']: result.metrics.get('mlogloss', result.metrics.get('validation-mlogloss', 0.0))},
checkpoint=result.checkpoint
)
ステップ 4: Ray Tune によるハイパーパラメータの最適化
Optuna と Ray Tune を組み合わせてハイパーパラメータを自動探索し、 MLflowと統合してエクスペリメントの追跡を実現します。
import mlflow
from ray import tune
from ray.tune.tuner import Tuner
from ray.tune.search.optuna import OptunaSearch
from ray.air.integrations.mlflow import MLflowLoggerCallback
# Import serverless GPU launcher
try:
from serverless_gpu.ray import ray_launch
except ImportError:
raise ImportError(
"serverless_gpu package not found. Please ensure you're running on Serverless GPU compute "
"or install it using: %pip install databricks-serverless-gpu"
)
# Set up MLflow experiment
username = spark.sql("SELECT session_user()").collect()[0][0]
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1]
experiment_name = f"/Users/{username}/{notebook_name}"
# Configure MLflow to use Unity Catalog
mlflow.set_registry_uri("databricks-uc")
@ray.remote(num_cpus=1) # Ensure main_task is not scheduled on head
class TaskRunner:
def run(self):
# Load dataset as distributed Ray Dataset
train_dataset, val_dataset = read_ray_dataset(
UC_CATALOG, UC_SCHEMA, table_name, WAREHOUSE_ID if WAREHOUSE_ID and WAREHOUSE_ID.strip() else None
)
# Define the hyperparameter search space.
param_space = {
"num_workers": NUM_WORKERS,
"use_gpu": True,
"params": {
"objective": "multi:softmax",
'eval_metric': 'mlogloss',
"tree_method": "hist",
"device": "cuda",
"num_class": NUM_LABELS,
"learning_rate": tune.uniform(0.01, 0.3),
"num_estimators": tune.randint(20, 30)
}
}
# Set up search algorithm. Here we use Optuna with the default Bayesian sampler (TPES)
optuna = OptunaSearch(
metric=param_space['params']['eval_metric'],
mode="min"
)
with mlflow.start_run() as run:
# Set up Tuner job and run.
tuner = tune.Tuner(
tune.with_parameters(
train_driver_fn,
train_dataset=train_dataset,
val_dataset=val_dataset,
ray_storage_path=ray_storage_path
),
run_config=tune.RunConfig(
name='xgboost_raytune_run',
storage_path=ray_storage_path,
callbacks=[MLflowLoggerCallback(
save_artifact=True,
tags={"mlflow.parentRunId": run.info.run_id},
log_params_on_trial_end=True
)]
),
tune_config=tune.TuneConfig(
num_samples=NUM_HPO_TRIALS,
max_concurrent_trials=MAX_CONCURRENT_TRIALS,
search_alg=optuna,
),
param_space=param_space,
)
results = tuner.fit()
return results
ステップ 5: 分散トレーニングを開始する
分散サーバレス GPU APIを使用して、リモート A10 GPU で分散モデル トレーニングを実行します。
@ray_launch(gpus=8, gpu_type='A10', remote=True)
def my_ray_function():
runner = TaskRunner.remote()
return ray.get(runner.run.remote())
results = my_ray_function.distributed()
ステップ 6: 最適なモデルを取得してMLflowにログを記録する
ハイパーパラメータ最適化から最適なモデルを抽出し、 Unity Catalogに登録します。
# Get the best result
results = results[0] if type(results) == list else results
best_result = results.get_best_result(metric="mlogloss", mode="min")
best_params = best_result.config
print(f"Best hyperparameters: {best_params}")
print(f"Best validation mlogloss: {best_result.metrics.get('mlogloss', 'N/A')}")
# Load the best model
booster = RayTrainReportCallback.get_model(best_result.checkpoint)
# Sample data for input example
sample_data = spark.read.table(f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}").limit(5).toPandas()
with mlflow.start_run() as run:
logged_model = mlflow.xgboost.log_model(
booster,
"model",
input_example=sample_data[[col for col in sample_data.columns if col != label]]
)
print(f"Model logged to MLflow: {logged_model.model_uri}")
ステップ 7: テストモデル推論
記録済みモデルをロードし、サンプルデータの推論をテストします。
# Load the model
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
# Test inference
test_data = spark.read.table(f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}").limit(10).toPandas()
predictions = loaded_model.predict(test_data[[col for col in test_data.columns if col != label]])
print("Sample predictions:")
print(predictions)
print("\nSample actual labels:")
print(test_data[label].values)
次のステップ
Databricksサーバレス GPU で Ray Tune を使用したハイパーパラメータ最適化による分散XGBoostトレーニングが正常に完了しました。
カスタマイズオプション
- データセットをスケールする : より大きなデータセットに合わせて
num_training_rowsウィジェットとnum_training_columnsウィジェットを調整します。 - ハイパーパラメータを調整する : ステップ 4 の
param_spaceを変更して、別のXGBoost分散を調べます - GPU リソースを増やす : コンピュート能力を高めるために、
@ray_launchデコレータのgpus問題を更新します - ワーカーを調整 : トレーニングの並列処理をスケールするために
num_workersウィジェットを変更します - さまざまなモデルを試す : XGBoost を他の Ray Train 互換フレームワークに置き換える
リソース
- サーバーレス GPU APIドキュメント
- Ray トレーニングするドキュメント
- Ray Tune ドキュメント
- XGBoost ドキュメント
- MLflow ドキュメント
- Unity Catalogドキュメント
掃除
ノートブックが切断されると、GPU リソースは自動的にクリーンアップされます。手動で切断するには:
- 「コンピュート」ドロップダウンで 「接続済み」 をクリックします。
- サーバレス の上にマウスを移動します
- ドロップダウンメニューから 「終了」 を選択します