Pular para o conteúdo principal

Treinamento distribuído de modelos XGBoost usando xgboost.spark

info

Visualização

Esse recurso está em Public Preview.

O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark. Esse módulo inclui os estimadores do xgboost PySpark xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker. Essas novas classes dão suporte à inclusão de estimadores XGBoost no pipeline SparkML. Para obter detalhes sobre API, consulte o documentoXGBoost Python spark API.

Requisitos

Databricks Runtime 12,0 MLe acima.

Parâmetrosxgboost.spark

Os estimadores definidos no módulo xgboost.spark suportam a maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.

  • Os parâmetros do construtor da classe, do método fit e do método predict são praticamente idênticos aos do módulo xgboost.sklearn.
  • A nomenclatura, os valores e o padrão são, em sua maioria, idênticos aos descritos em XGBoost parameters.
  • As exceções são alguns parâmetros não suportados (como gpu_id, nthread, sample_weight, eval_set) e os parâmetros específicos do estimador pyspark que foram adicionados (como featuresCol, labelCol, use_gpu, validationIndicatorCol). Para obter detalhes, consulte a documentaçãoXGBoost Python Spark API.

Treinamento distribuído

Os estimadores do PySpark definidos no módulo xgboost.spark suportam o treinamento distribuído do XGBoost usando o parâmetro num_workers. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers como o número de concorrentes executando Spark tarefa durante o treinamento distribuído. Para usar todos os Spark slots de tarefa, defina num_workers=sc.defaultParallelism.

Por exemplo:

Python
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
nota
  • O senhor não pode usar o site mlflow.xgboost.autolog com o XGBoost distribuído. Para log um modelo xgboost Spark usando MLflow, use mlflow.spark.log_model(spark_xgb_model, artifact_path).
  • O senhor não pode usar o XGBoost distribuído em um clustering que tenha a autoescala ativada. Os novos nós do worker que começam nesse paradigma de escalonamento elástico não podem receber novos conjuntos de tarefas e permanecem parados. Para obter instruções sobre como desativar a autoescala, consulte Ativar autoescala.

Ativar a otimização para treinamento em recurso esparso dataset

PySpark Os estimadores definidos no módulo xgboost.spark suportam a otimização para treinamento no conjunto de dados com recurso esparso. Para permitir a otimização de conjuntos de recursos esparsos, o senhor precisa fornecer um dataset para o método fit que contenha uma coluna de recurso composta por valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro do estimador enable_sparse_data_optim como True. Além disso, você precisa definir o parâmetro missing como 0.0.

Por exemplo:

Python
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU treinamento

Os estimadores do PySpark definidos no módulo xgboost.spark suportam treinamento em GPUs. Defina o parâmetro use_gpu como True para ativar o treinamento da GPU.

nota

Para cada tarefa do Spark usada no treinamento distribuído do XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu é definido como True. Databricks recomenda usar o valor default de 1 para a configuração de clustering Spark spark.task.resource.gpu.amount. Caso contrário, as GPUs adicionais alocadas para essa Spark tarefa são paradas.

Por exemplo:

Python
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Solução de problemas

Durante o treinamento com vários nós, se o senhor encontrar uma mensagem NCCL failure: remote process exited or there was a network error, isso normalmente indica um problema com a comunicação de rede entre as GPUs. Esse problema surge quando a NCCL (NVIDIA Collective Communications biblioteca) não pode usar determinadas interfaces de rede para comunicação com a GPU.

Para resolver, defina o sparkConf do clustering para spark.executorEnv.NCCL_SOCKET_IFNAME como eth. Isso basicamente define a variável de ambiente NCCL_SOCKET_IFNAME como eth para todos os trabalhadores em um nó.

Exemplo de notebook

Este Notebook mostra o uso do pacote Python xgboost.spark com Spark MLlib.

PySpark-XGBoost Notebook

Open notebook in new tab

Guia de migração para o módulo obsoleto sparkdl.xgboost

  • Substitua from sparkdl.xgboost import XgboostRegressor por from xgboost.spark import SparkXGBRegressor e substitua from sparkdl.xgboost import XgboostClassifier por from xgboost.spark import SparkXGBClassifier.
  • Altere todos os nomes de parâmetros no construtor do estimador do estilo camelCase para o estilo snake_case. Por exemplo, altere XgboostRegressor(featuresCol=XXX) para SparkXGBRegressor(features_col=XXX).
  • Os parâmetros use_external_storage e external_storage_precision foram removidos. xgboost.spark Os estimadores usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo ineficiente de armazenamento externo. Para conjuntos de dados extremamente grandes, o site Databricks recomenda que o senhor aumente o parâmetro num_workers, o que faz com que cada treinamento tarefa particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuração num_workers = sc.defaultParallelism, que define num_workers como o número total de Spark slots de tarefa no clustering.
  • Para os estimadores definidos em xgboost.spark, a configuração num_workers=1 executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela configuração de clustering Spark spark.task.cpus, que é 1 por default. Para usar mais núcleos de CPU para treinar o modelo, aumente num_workers ou spark.task.cpus. Você não pode definir o parâmetro nthread ou n_jobs para estimadores definidos em xgboost.spark. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote obsoleto sparkdl.xgboost.

Converta o modelo sparkdl.xgboost em modelo xgboost.spark

sparkdl.xgboost os modelos são salvos em um formato diferente dos modelos xgboost.spark e têm configurações de parâmetros diferentes. Use a seguinte função utilidades para converter o modelo:

Python
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.

:return
A `xgboost.spark` model instance
"""

def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key

xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}

booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)

Se você tiver um modelo pyspark.ml.PipelineModel contendo um modelo sparkdl.xgboost como último estágio, você pode substituir o estágio do modelo sparkdl.xgboost por o modelo xgboost.spark convertido.

Python
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)