Treinamento distribuído de modelos XGBoost usando xgboost.spark

Visualização

Este recurso está em visualização pública.

O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark. Este módulo inclui os estimadores xgboost PySpark xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker. Essas novas classes suportam a inclusão de estimadores XGBoost no pipeline SparkML. Para obter detalhes da API, consulte o documento da API Spark XGBoost Python .

Requisitos

Databricks Runtime 12,0 MLe acima.

xgboost.spark parâmetros

Os estimadores definidos no módulo xgboost.spark suportam a maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.

  • Os parâmetros para o construtor de classe, método fit e método predict são amplamente idênticos aos do módulo xgboost.sklearn .

  • A nomenclatura, os valores e default são praticamente idênticos aos descritos nos parâmetros do XGBoost.

  • As exceções são alguns parâmetros sem suporte (como gpu_id, nthread, sample_weight, eval_set) e os pyspark parâmetros específicos do estimador que foram adicionados (como featuresCol, labelCol, use_gpu, validationIndicatorCol). Para obter detalhes, consulte a documentação Spark API .

treinamento distribuído

Os estimadores do PySpark definidos no módulo xgboost.spark suportam o treinamento distribuído do XGBoost usando o parâmetro num_workers. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers como o número de concorrentes executando Spark tarefa durante o treinamento distribuído. Para usar todos os Spark slots de tarefa, defina num_workers=sc.defaultParallelism.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

Observação

  • Você não pode usar mlflow.xgboost.autolog com XGBoost distribuído. Para logs um modelo xgboost Spark usando MLflow, use mlflow.spark.log_model(spark_xgb_model, artifact_path).

  • Você não pode usar o XGBoost distribuído em clusters com autoscale habilitado. Novos nós worker que começam neste paradigma de escala elástica não podem receber novos conjuntos de tarefas e permanecem parados. Para obter instruções sobre como desabilitar autoscale, consulte Habilitar autoscale.

Ativar otimização para treinamento em conjunto de dados de recursos esparsos

Estimadores PySpark definidos no módulo xgboost.spark oferecem suporte à otimização para treinamento em dataset com recursos esparsos. Para ativar a otimização de conjuntos de recursos esparsos, você precisa fornecer um dataset para o método fit que contém uma coluna de recursos que consiste em valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro do estimador enable_sparse_data_optim como True. Além disso, você precisa definir o parâmetro missing como 0.0.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU treinamento

Os estimadores PySpark definidos no módulo xgboost.spark dão suporte ao treinamento em GPUs. Defina o parâmetro use_gpu como True para ativar o treinamento de GPU.

Observação

Para cada tarefa Spark usada no treinamento distribuído XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu é definido como True. Databricks recomenda usar o valor default de 1 para a configuração clusters Spark spark.task.resource.gpu.amount. Caso contrário, as GPUs adicionais alocadas para esta tarefa do Spark serão interrompidas.

Por exemplo:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

Solução de problemas

Durante o treinamento com vários nós, se o senhor encontrar uma mensagem NCCL failure: remote process exited or there was a network error, isso normalmente indica um problema com a comunicação de rede entre as GPUs. Esse problema surge quando a NCCL (NVIDIA Collective Communications biblioteca) não pode usar determinadas interfaces de rede para comunicação com a GPU.

Para resolver, defina o sparkConf do cluster para spark.executorEnv.NCCL_SOCKET_IFNAME como eth. Isso basicamente define a variável de ambiente NCCL_SOCKET_IFNAME para eth para todos os trabalhadores em um nó.

Notebook de exemplo

Este Notebook mostra o uso do pacote Python xgboost.spark com Spark MLlib.

Notebook PySpark-XGBoost

Abra o bloco de anotações em outra guia

Guia de migração para o módulo obsoleto sparkdl.xgboost

  • Substitua from sparkdl.xgboost import XgboostRegressor por from xgboost.spark import SparkXGBRegressor e substitua from sparkdl.xgboost import XgboostClassifier por from xgboost.spark import SparkXGBClassifier.

  • Altere todos os nomes de parâmetro no construtor estimador do estilo camelCase para o estilo snake_case. Por exemplo, altere XgboostRegressor(featuresCol=XXX) para SparkXGBRegressor(features_col=XXX).

  • Os parâmetros use_external_storage e external_storage_precision foram removidos. Os estimadores xgboost.spark usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não há mais necessidade de usar o ineficiente modo de armazenamento externo. Para conjuntos de dados extremamente grandes, o site Databricks recomenda que o senhor aumente o parâmetro num_workers, o que faz com que cada treinamento tarefa particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuração num_workers = sc.defaultParallelism, que define num_workers como o número total de Spark slots de tarefa em cluster.

  • Para os estimadores definidos em xgboost.spark, a configuração num_workers=1 executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela configuração Spark cluster spark.task.cpus, que é 1 por default. Para usar mais núcleos de CPU para ensinar o modelo, aumente num_workers ou spark.task.cpus. O senhor não pode definir o parâmetro nthread ou n_jobs para os estimadores definidos em xgboost.spark. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote sparkdl.xgboost obsoleto.

Converter modelo sparkdl.xgboost em modelo xgboost.spark

sparkdl.xgboost os modelos são salvos em um formato diferente dos modelos xgboost.spark e têm configurações de parâmetros diferentes. Use a seguinte função de utilidades para converter o modelo:

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

Se você tiver um modelo pyspark.ml.PipelineModel contendo um modelo sparkdl.xgboost como o último estágio, poderá substituir o estágio do modelo sparkdl.xgboost pelo modelo xgboost.spark convertido.

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)