XGBoost distribuído com GPUs usando Ray Tune em computação GPU sem servidor
Este notebook demonstra o treinamento distribuído de ponta a ponta XGBoost com otimização de hiperparâmetros usando Ray Tune em computação GPU sem servidor Databricks . Abrange:
- Geração de conjuntos de dados sintéticos : Criação de um dataset sintéticos de grande escala usando
dbldatagen - Treinamento distribuído : Usando o Ray ensinando com XGBoost para paralelismo de dados distribuídos
- Otimização de hiperparâmetros : Utilizando o Ray Tune com o Optuna para busca automatizada de hiperparâmetros.
- IntegraçãoMLflow : Registro de experimentos, métricas e modelos no Unity Catalog
Esta demonstração usa 30 milhões de linhas x 100 colunas de recurso x 1 coluna de destino (2 classes) para classificação binária. Este dataset tem aproximadamente 12 GB de tamanho compactado e oferece uma excelente base para experimentos de treinamento distribuído.
Computação GPU sem servidor Benefícios
- Dimensionamento sob demanda : provisionamento e escalonamento automáticos de recursos de GPU com base nas demandas de carga de trabalho.
- Otimização de custos : pague apenas pelo tempo compute utilizado, com limpeza automática de recursos.
- Sem gerenciamento de infraestrutura : concentre-se no treinamento de ML sem se preocupar com o hardware subjacente.
- Alta disponibilidade : Tolerância a falhas integrada e recursos de failover automático.
Perguntas frequentes
Quando devo migrar para uma versão distribuída do XGBoost?
- Conjuntos de dados XGBoost de grande porte devem utilizar paralelismo de dados distribuído (DDP). Este exemplo utiliza 30 milhões de linhas para fins de demonstração.
- Considere um único nó e multithreading em todas as CPUs, depois DDP em vários nós com CPUs e, por fim, DDP utilizando várias GPUs.
Se eu estiver usando GPUs, quanta memória (VRAM) eu preciso para meu dataset?
- 30 milhões de linhas x 100 colunas x 4 bytes (float32) = ~12 GB
- É necessário um total de 2 a 4 vezes a quantidade de dados em VRAM nas GPUs (2 vezes, ou seja, ~24 GB) para treinar o modelo.
- Essa memória extra é necessária para as rodadas de boosting, o tamanho do modelo, os gradientes e os cálculos intermediários.
- As GPUs A10G (com 24 GB de VRAM cada) são perfeitas para essa carga de trabalho — 1 a 2 GPUs por modelo.
Especificações de computação GPU sem servidor
Computação GPU sem servidor Databricks :
- Tipos de GPU : NVIDIA A10G (24 GB de VRAM), H100 (80 GB de VRAM)
- Dimensionamento automático : Aumenta automaticamente a capacidade com base nas demandas de carga de trabalho.
- Cobrança : Cobrança por segundo com limpeza automática de recursos.
- Disponibilidade : Suporte multirregional com alta disponibilidade.
- Integração : Integração perfeita com o Unity Catalog e o MLflow.
Configuração recomendada:
- trabalhador : trabalhador de 2-4 raios para desempenho ideal
- Alocação de GPU : 1 GPU A10G por worker (24 GB de VRAM)
- Memória : 32 GB de RAM por worker para pré-processamento de dados.
- Armazenamento : Integração com Unity Catalog para acesso aos dados.
Conecte-se ao compute GPU serverless e instale as dependências.
Conecte seu notebook à GPU A10 serverless :
- Clique no dropdown "Conectar" na parte superior.
- Selecione GPU sem servidor .
- Abra o painel lateral Ambiente , localizado no lado direito do Notebook.
- Configure o acelerador para A10 para esta demonstração.
- Selecione Aplicar e clique em Confirmar para aplicar este ambiente ao seu Notebook.
%pip install -qU dbldatagen ray[all]==2.48.0 xgboost optuna mlflow>=3.0
# Note: serverless_gpu package should be pre-installed in the Serverless GPU environment
# If not available, install it using: %pip install databricks-serverless-gpu
dbutils.library.restartPython()
Use widgets para definir os caminhos do seu Unity Catalog e os parâmetros de treinamento.
Crie widgets para o caminho Unity Catalog e para a configuração de treinamento. O código abaixo usa valores de espaço reservado que você pode personalizar.
# Define job inputs
dbutils.widgets.text("catalog", "main", "Unity Catalog Name")
dbutils.widgets.text("schema", "dev", "Unity Catalog Schema Name")
dbutils.widgets.text("num_training_rows", "30000000", "Number of training rows to generate")
dbutils.widgets.text("num_training_columns", "100", "Number of feature columns")
dbutils.widgets.text("num_labels", "2", "Number of labels in the target column")
dbutils.widgets.text("warehouse_id", "93a682dcf60dae13", "SQL Warehouse ID (optional, for reading from UC)")
dbutils.widgets.text("num_workers", "2", "Number of Ray workers per trial")
dbutils.widgets.text("num_hpo_trials", "8", "Number of hyperparameter optimization trials")
dbutils.widgets.text("max_concurrent_trials", "4", "Maximum concurrent HPO trials")
# Get parameter values (will override widget defaults if run by job)
UC_CATALOG = dbutils.widgets.get("catalog")
UC_SCHEMA = dbutils.widgets.get("schema")
NUM_TRAINING_ROWS = int(dbutils.widgets.get("num_training_rows"))
NUM_TRAINING_COLUMNS = int(dbutils.widgets.get("num_training_columns"))
NUM_LABELS = int(dbutils.widgets.get("num_labels"))
WAREHOUSE_ID = dbutils.widgets.get("warehouse_id")
NUM_WORKERS = int(dbutils.widgets.get("num_workers"))
NUM_HPO_TRIALS = int(dbutils.widgets.get("num_hpo_trials"))
MAX_CONCURRENT_TRIALS = int(dbutils.widgets.get("max_concurrent_trials"))
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"NUM_TRAINING_ROWS: {NUM_TRAINING_ROWS}")
print(f"NUM_TRAINING_COLUMNS: {NUM_TRAINING_COLUMNS}")
print(f"NUM_LABELS: {NUM_LABELS}")
print(f"NUM_WORKERS: {NUM_WORKERS}")
print(f"NUM_HPO_TRIALS: {NUM_HPO_TRIALS}")
o passo 1: Gerar conjunto de dados sintéticos
Criaremos um dataset sintéticos usando dbldatagen para fins de demonstração. Em produção, você usaria seus dados de treinamento reais.
import dbldatagen as dg
from pyspark.sql.types import FloatType, IntegerType
import os
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog
# Create schema if it doesn't exist
w = WorkspaceClient()
try:
created_schema = w.schemas.create(name=UC_SCHEMA, catalog_name=UC_CATALOG)
print(f"Schema '{created_schema.name}' created successfully")
except Exception as e:
# Handle the case where the schema already exists
print(f"Schema '{UC_SCHEMA}' already exists in catalog '{UC_CATALOG}'. Skipping schema creation.")
# Create volumes for data storage
parquet_write_path = f'/Volumes/{UC_CATALOG}/{UC_SCHEMA}/synthetic_data'
if not os.path.exists(parquet_write_path):
created_volume = w.volumes.create(
catalog_name=UC_CATALOG,
schema_name=UC_SCHEMA,
name='synthetic_data',
volume_type=catalog.VolumeType.MANAGED
)
print(f"Volume 'synthetic_data' at {parquet_write_path} created successfully")
else:
print(f"Volume {parquet_write_path} already exists. Skipping volumes creation.")
# Create volume for Ray storage
ray_storage_path = f'/Volumes/{UC_CATALOG}/{UC_SCHEMA}/ray_data_tmp_dir'
if not os.path.exists(ray_storage_path):
created_volume = w.volumes.create(
catalog_name=UC_CATALOG,
schema_name=UC_SCHEMA,
name='ray_data_tmp_dir',
volume_type=catalog.VolumeType.MANAGED
)
print(f"Volume 'ray_data_tmp_dir' at {ray_storage_path} created successfully")
else:
print(f"Volume {ray_storage_path} already exists. Skipping volumes creation.")
# Generate synthetic dataset
table_name = f"synthetic_data_{NUM_TRAINING_ROWS}_rows_{NUM_TRAINING_COLUMNS}_columns_{NUM_LABELS}_labels"
label = "target"
print(f"Generating {NUM_TRAINING_ROWS} synthetic rows")
print(f"Generating {NUM_TRAINING_COLUMNS} synthetic columns")
print(f"Generating {NUM_LABELS} synthetic labels")
testDataSpec = (
dg.DataGenerator(spark, name="synthetic_data", rows=NUM_TRAINING_ROWS)
.withIdOutput()
.withColumn(
"r",
FloatType(),
expr="rand()",
numColumns=NUM_TRAINING_COLUMNS,
)
.withColumn(
"target",
IntegerType(),
expr=f"floor(rand()*{NUM_LABELS})",
numColumns=1
)
)
df = testDataSpec.build()
df = df.repartition(50)
# Write to Delta table
df.write.format("delta").mode("overwrite").option("delta.enableDeletionVectors", "true").saveAsTable(
f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}"
)
# Write to Parquet for Ray dataset reading (backup option)
df.write.mode("overwrite").format("parquet").save(
f"{parquet_write_path}/{table_name}"
)
print(f"Dataset created successfully: {UC_CATALOG}.{UC_SCHEMA}.{table_name}")
o passo 2: Configurar conjunto de dados Ray e funções de treinamento
Configure o Ray para ler do Unity Catalog e configure as funções de treinamento distribuídas.
import ray
import os
# Set up environment variables for MLflow
os.environ['DATABRICKS_HOST'] = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
def read_ray_dataset(catalog, schema, table, warehouse_id=None):
"""
Read data from Unity Catalog into a Ray Dataset.
Args:
catalog: Unity Catalog name
schema: Schema name
table: Table name
warehouse_id: Optional SQL Warehouse ID for reading from UC
Returns:
train_dataset, val_dataset: Ray Datasets for training and validation
"""
try:
## Option 1 (PREFERRED): Build a Ray Dataset using a Databricks SQL Warehouse
if warehouse_id and warehouse_id.strip():
ds = ray.data.read_databricks_tables(
warehouse_id=warehouse_id,
catalog=catalog,
schema=schema,
query=f'SELECT * FROM {catalog}.{schema}.{table}',
)
print('Read directly from Unity Catalog using SQL Warehouse')
else:
raise ValueError("Warehouse ID not provided - falling back to Parquet")
except Exception as e:
print(f"Note: {e}")
## Option 2: Build a Ray Dataset using Parquet files
# If you have too many Ray nodes, you may not be able to create a Ray dataset
# using the warehouse method above because of rate limits. One backup solution
# is to create parquet files from the delta table and build a ray dataset from that.
parquet_path = f'/Volumes/{catalog}/{schema}/synthetic_data/{table}'
ds = ray.data.read_parquet(parquet_path)
print('Read directly from Parquet files')
train_dataset, val_dataset = ds.train_test_split(test_size=0.25)
return train_dataset, val_dataset
o passo 3: Definir funções XGBoost treinamento
Defina a função de treinamento porworker e a função principal que orquestra o treinamento distribuído.
import xgboost
import ray.train
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
def train_fn_per_worker(params: dict):
"""
Trains an XGBoost model on a shard of the distributed dataset assigned to this worker.
This function is designed to be executed by individual Ray Train workers.
It retrieves the training and validation data shards, converts them to DMatrix format,
and performs a portion of the distributed XGBoost training. Ray Train handles
the inter-worker communication.
Args:
params (dict): A dictionary of XGBoost training parameters, including
'num_estimators', 'eval_metric', and potentially other
XGBoost-specific parameters.
"""
# Get dataset shards for this worker
train_shard = ray.train.get_dataset_shard("train")
val_shard = ray.train.get_dataset_shard("val")
# Convert shards to pandas DataFrames
train_df = train_shard.materialize().to_pandas()
val_df = val_shard.materialize().to_pandas()
train_X = train_df.drop(label, axis=1)
train_y = train_df[label]
val_X = val_df.drop(label, axis=1)
val_y = val_df[label]
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(val_X, label=val_y)
# Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for workers to communicate with each other.
evals_results = {}
bst = xgboost.train(
params,
dtrain=dtrain,
evals=[(deval, "validation")],
num_boost_round=params['num_estimators'],
evals_result=evals_results,
callbacks=[RayTrainReportCallback(
metrics={params['eval_metric']: f"validation-{params['eval_metric']}"},
frequency=1
)],
)
def train_driver_fn(config: dict, train_dataset, val_dataset, ray_storage_path: str):
"""
Drives the distributed XGBoost training process using Ray Train.
This function sets up the XGBoostTrainer, configures scaling (number of workers, GPU usage,
and resources per worker), and initiates the distributed training by calling `trainer.fit()`.
It also propagates metrics back to Ray Tune if integrated.
Args:
config (dict): A dictionary containing run-level hyperparameters such as
'num_workers', 'use_gpu', and a nested 'params' dictionary
for XGBoost training parameters.
train_dataset: The Ray Dataset for training.
val_dataset: The Ray Dataset for validation.
ray_storage_path: Path for Ray storage.
Returns:
None: The function reports metrics to Ray Tune but does not explicitly return a value.
The trained model artifact is typically handled by Ray Train's checkpointing.
"""
# Unpack run-level hyperparameters.
num_workers = config["num_workers"]
use_gpu = config["use_gpu"]
params = config['params']
# Initialize the XGBoostTrainer, which orchestrates the distributed training using Ray.
trainer = XGBoostTrainer(
train_loop_per_worker=train_fn_per_worker, # The function to be executed on each worker
train_loop_config=params,
# By default Ray uses 1 GPU and 1 CPU per worker if resources_per_worker is not specified.
# XGBoost is multi-threaded, so multiple CPUs can be assigned per worker, but not GPUs.
scaling_config=ray.train.ScalingConfig(
num_workers=num_workers,
use_gpu=use_gpu,
resources_per_worker={"CPU": 12, "GPU": 1}
),
datasets={"train": train_dataset, "val": val_dataset}, # Ray Datasets to be used by the trainer + workers
run_config=ray.train.RunConfig(storage_path=ray_storage_path)
)
result = trainer.fit()
# Propagate metrics back up for Ray Tune.
# Ensure the metric key matches your eval_metric.
ray.tune.report(
{params['eval_metric']: result.metrics.get('mlogloss', result.metrics.get('validation-mlogloss', 0.0))},
checkpoint=result.checkpoint
)
Passo 4: Otimização de Hiperparâmetros com Ray Tune
Utilize o Ray Tune com Optuna para busca automatizada de hiperparâmetros, integrado ao MLflow para acompanhamento de experimentos.
import mlflow
from ray import tune
from ray.tune.tuner import Tuner
from ray.tune.search.optuna import OptunaSearch
from ray.air.integrations.mlflow import MLflowLoggerCallback
# Import serverless GPU launcher
try:
from serverless_gpu.ray import ray_launch
except ImportError:
raise ImportError(
"serverless_gpu package not found. Please ensure you're running on Serverless GPU compute "
"or install it using: %pip install databricks-serverless-gpu"
)
# Set up MLflow experiment
username = spark.sql("SELECT session_user()").collect()[0][0]
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split("/")[-1]
experiment_name = f"/Users/{username}/{notebook_name}"
# Configure MLflow to use Unity Catalog
mlflow.set_registry_uri("databricks-uc")
@ray.remote(num_cpus=1) # Ensure main_task is not scheduled on head
class TaskRunner:
def run(self):
# Load dataset as distributed Ray Dataset
train_dataset, val_dataset = read_ray_dataset(
UC_CATALOG, UC_SCHEMA, table_name, WAREHOUSE_ID if WAREHOUSE_ID and WAREHOUSE_ID.strip() else None
)
# Define the hyperparameter search space.
param_space = {
"num_workers": NUM_WORKERS,
"use_gpu": True,
"params": {
"objective": "multi:softmax",
'eval_metric': 'mlogloss',
"tree_method": "hist",
"device": "cuda",
"num_class": NUM_LABELS,
"learning_rate": tune.uniform(0.01, 0.3),
"num_estimators": tune.randint(20, 30)
}
}
# Set up search algorithm. Here we use Optuna with the default Bayesian sampler (TPES)
optuna = OptunaSearch(
metric=param_space['params']['eval_metric'],
mode="min"
)
with mlflow.start_run() as run:
# Set up Tuner job and run.
tuner = tune.Tuner(
tune.with_parameters(
train_driver_fn,
train_dataset=train_dataset,
val_dataset=val_dataset,
ray_storage_path=ray_storage_path
),
run_config=tune.RunConfig(
name='xgboost_raytune_run',
storage_path=ray_storage_path,
callbacks=[MLflowLoggerCallback(
save_artifact=True,
tags={"mlflow.parentRunId": run.info.run_id},
log_params_on_trial_end=True
)]
),
tune_config=tune.TuneConfig(
num_samples=NUM_HPO_TRIALS,
max_concurrent_trials=MAX_CONCURRENT_TRIALS,
search_alg=optuna,
),
param_space=param_space,
)
results = tuner.fit()
return results
o passo 5: Lançamento Treinamento Distribuído
Utilize a API de GPU distribuída serverless para executar treinamento de modelos distribuídos em GPUs A10 remotas.
@ray_launch(gpus=8, gpu_type='A10', remote=True)
def my_ray_function():
runner = TaskRunner.remote()
return ray.get(runner.run.remote())
results = my_ray_function.distributed()
o passo 6: Recuperar o melhor modelo e registrá-lo no MLflow
Extraia o melhor modelo da otimização de hiperparâmetros e registre-o no Unity Catalog.
# Get the best result
results = results[0] if type(results) == list else results
best_result = results.get_best_result(metric="mlogloss", mode="min")
best_params = best_result.config
print(f"Best hyperparameters: {best_params}")
print(f"Best validation mlogloss: {best_result.metrics.get('mlogloss', 'N/A')}")
# Load the best model
booster = RayTrainReportCallback.get_model(best_result.checkpoint)
# Sample data for input example
sample_data = spark.read.table(f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}").limit(5).toPandas()
with mlflow.start_run() as run:
logged_model = mlflow.xgboost.log_model(
booster,
"model",
input_example=sample_data[[col for col in sample_data.columns if col != label]]
)
print(f"Model logged to MLflow: {logged_model.model_uri}")
o passo 7: Inferência do modelo de teste
Carregue os modelos registrados e teste a inferência em dados de amostra.
# Load the model
loaded_model = mlflow.pyfunc.load_model(logged_model.model_uri)
# Test inference
test_data = spark.read.table(f"{UC_CATALOG}.{UC_SCHEMA}.{table_name}").limit(10).toPandas()
predictions = loaded_model.predict(test_data[[col for col in test_data.columns if col != label]])
print("Sample predictions:")
print(predictions)
print("\nSample actual labels:")
print(test_data[label].values)
Próximos passos
Você concluiu com sucesso o treinamento distribuído XGBoost com otimização de hiperparâmetros usando o Ray Tune em uma GPU sem servidor Databricks .
Opções de personalização
- Aumente a escala do seu dataset : ajuste os widgets
num_training_rowsenum_training_columnspara conjuntos de dados maiores. - Ajuste os hiperparâmetros : Modifique o
param_spacena etapa 4 para explorar diferentes parâmetros XGBoost - Aumentar o recurso da GPU : Atualize o parâmetro
gpusno decorador@ray_launchpara obter mais poder compute . - Ajustar trabalhador : Alterar widget
num_workerspara paralelismo de escala treinamento - Experimente modelos diferentes : Substitua o XGBoost por outras estruturas compatíveis com o Ray Train.
recurso
- Documentação API de GPU sem servidor
- Ray ensina Documentação
- Documentação Ray Tune
- Documentação do XGBoost
- Documentação do MLflow
- Documentação Unity Catalog
Limpar
Os recursos da GPU são limpos automaticamente quando o notebook é desconectado. Para desconectar manualmente:
- Clique em Conectado no dropdown compute
- Passe o cursor sobre "sem servidor"
- Selecione "Encerrar" no menu dropdown .