Treinamento distribuído de modelos XGBoost usando xgboost.spark
Visualização
Esse recurso está em Public Preview.
O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark. Esse módulo inclui os estimadores do xgboost PySpark xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier e xgboost.spark.SparkXGBRanker. Essas novas classes dão suporte à inclusão de estimadores XGBoost no pipeline SparkML. Para obter detalhes sobre API, consulte o documentoXGBoost Python spark API.
Requisitos
Databricks Runtime 12,0 MLe acima.
Parâmetrosxgboost.spark
Os estimadores definidos no módulo xgboost.spark suportam a maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros do construtor da classe, do método fite do métodopredictsão praticamente idênticos aos do móduloxgboost.sklearn.
- A nomenclatura, os valores e o padrão são, em sua maioria, idênticos aos descritos em XGBoost parameters.
- As exceções são alguns parâmetros não suportados (como gpu_id,nthread,sample_weight,eval_set) e os parâmetros específicos do estimadorpysparkque foram adicionados (comofeaturesCol,labelCol,use_gpu,validationIndicatorCol). Para obter detalhes, consulte a documentaçãoXGBoost Python Spark API.
Treinamento distribuído
Os estimadores do PySpark definidos no módulo xgboost.spark suportam o treinamento distribuído do XGBoost usando o parâmetro num_workers. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers como o número de concorrentes executando Spark tarefa durante o treinamento distribuído. Para usar todos os Spark slots de tarefa, defina num_workers=sc.defaultParallelism.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
- O senhor não pode usar o site mlflow.xgboost.autologcom o XGBoost distribuído. Para log um modelo xgboost Spark usando MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path).
- O senhor não pode usar o XGBoost distribuído em um clustering que tenha a autoescala ativada. Os novos nós do worker que começam nesse paradigma de escalonamento elástico não podem receber novos conjuntos de tarefas e permanecem parados. Para obter instruções sobre como desativar a autoescala, consulte Ativar autoescala.
Ativar a otimização para treinamento em recurso esparso dataset
PySpark Os estimadores definidos no módulo xgboost.spark suportam a otimização para treinamento no conjunto de dados com recurso esparso.
Para permitir a otimização de conjuntos de recursos esparsos, o senhor precisa fornecer um dataset para o método fit que contenha uma coluna de recurso composta por valores do tipo pyspark.ml.linalg.SparseVector e definir o parâmetro do estimador enable_sparse_data_optim como True. Além disso, você precisa definir o parâmetro missing como 0.0.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU treinamento
Os estimadores do PySpark definidos no módulo xgboost.spark suportam treinamento em GPUs. Defina o parâmetro use_gpu como True para ativar o treinamento da GPU.
Para cada tarefa do Spark usada no treinamento distribuído do XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu é definido como True. Databricks recomenda usar o valor default de 1 para a configuração de clustering Spark spark.task.resource.gpu.amount. Caso contrário, as GPUs adicionais alocadas para essa Spark tarefa são paradas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solução de problemas
Durante o treinamento com vários nós, se o senhor encontrar uma mensagem NCCL failure: remote process exited or there was a network error, isso normalmente indica um problema com a comunicação de rede entre as GPUs. Esse problema surge quando a NCCL (NVIDIA Collective Communications biblioteca) não pode usar determinadas interfaces de rede para comunicação com a GPU.
Para resolver, defina o sparkConf do clustering para spark.executorEnv.NCCL_SOCKET_IFNAME como eth. Isso basicamente define a variável de ambiente NCCL_SOCKET_IFNAME como eth para todos os trabalhadores em um nó.
Exemplo de notebook
Este Notebook mostra o uso do pacote Python xgboost.spark com Spark MLlib.
PySpark-XGBoost Notebook
Guia de migração para o módulo obsoleto sparkdl.xgboost
- Substitua from sparkdl.xgboost import XgboostRegressorporfrom xgboost.spark import SparkXGBRegressore substituafrom sparkdl.xgboost import XgboostClassifierporfrom xgboost.spark import SparkXGBClassifier.
- Altere todos os nomes de parâmetros no construtor do estimador do estilo camelCase para o estilo snake_case. Por exemplo, altere XgboostRegressor(featuresCol=XXX)paraSparkXGBRegressor(features_col=XXX).
- Os parâmetros use_external_storageeexternal_storage_precisionforam removidos.xgboost.sparkOs estimadores usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo ineficiente de armazenamento externo. Para conjuntos de dados extremamente grandes, o site Databricks recomenda que o senhor aumente o parâmetronum_workers, o que faz com que cada treinamento tarefa particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism, que definenum_workerscomo o número total de Spark slots de tarefa no clustering.
- Para os estimadores definidos em xgboost.spark, a configuraçãonum_workers=1executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela configuração de clustering Sparkspark.task.cpus, que é 1 por default. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workersouspark.task.cpus. Você não pode definir o parâmetronthreadoun_jobspara estimadores definidos emxgboost.spark. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote obsoletosparkdl.xgboost.
Converta o modelo sparkdl.xgboost em modelo xgboost.spark
sparkdl.xgboost os modelos são salvos em um formato diferente dos modelos xgboost.spark e têm
configurações de parâmetros diferentes. Use a seguinte função
utilidades para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.
  :return
      A `xgboost.spark` model instance
  """
  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key
  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }
  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)
Se você tiver um modelo pyspark.ml.PipelineModel contendo um modelo sparkdl.xgboost como
último estágio, você pode substituir o estágio do modelo sparkdl.xgboost por
o modelo xgboost.spark convertido.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)