Treinamento distribuído de modelos XGBoost usando xgboost.spark
Visualização
Esse recurso está em Public Preview.
O pacote Python xgboost>=1.7 contém um novo módulo xgboost.spark
. Esse módulo inclui os estimadores do xgboost PySpark xgboost.spark.SparkXGBRegressor
, xgboost.spark.SparkXGBClassifier
e xgboost.spark.SparkXGBRanker
. Essas novas classes dão suporte à inclusão de estimadores XGBoost no pipeline SparkML. Para obter detalhes sobre API, consulte o documentoXGBoost Python spark API.
Requisitos
Databricks Runtime 12,0 MLe acima.
Parâmetrosxgboost.spark
Os estimadores definidos no módulo xgboost.spark
suportam a maioria dos mesmos parâmetros e argumentos usados no XGBoost padrão.
- Os parâmetros do construtor da classe, do método
fit
e do métodopredict
são praticamente idênticos aos do móduloxgboost.sklearn
. - A nomenclatura, os valores e o padrão são, em sua maioria, idênticos aos descritos em XGBoost parameters.
- As exceções são alguns parâmetros não suportados (como
gpu_id
,nthread
,sample_weight
,eval_set
) e os parâmetros específicos do estimadorpyspark
que foram adicionados (comofeaturesCol
,labelCol
,use_gpu
,validationIndicatorCol
). Para obter detalhes, consulte a documentaçãoXGBoost Python Spark API.
Treinamento distribuído
Os estimadores do PySpark definidos no módulo xgboost.spark
suportam o treinamento distribuído do XGBoost usando o parâmetro num_workers
. Para usar o treinamento distribuído, crie um classificador ou regressor e defina num_workers
como o número de concorrentes executando Spark tarefa durante o treinamento distribuído. Para usar todos os Spark slots de tarefa, defina num_workers=sc.defaultParallelism
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
- O senhor não pode usar o site
mlflow.xgboost.autolog
com o XGBoost distribuído. Para log um modelo xgboost Spark usando MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path)
. - O senhor não pode usar o XGBoost distribuído em um clustering que tenha a autoescala ativada. Os novos nós do worker que começam nesse paradigma de escalonamento elástico não podem receber novos conjuntos de tarefas e permanecem parados. Para obter instruções sobre como desativar a autoescala, consulte Ativar autoescala.
Ativar a otimização para treinamento em recurso esparso dataset
PySpark Os estimadores definidos no módulo xgboost.spark
suportam a otimização para treinamento no conjunto de dados com recurso esparso.
Para permitir a otimização de conjuntos de recursos esparsos, o senhor precisa fornecer um dataset para o método fit
que contenha uma coluna de recurso composta por valores do tipo pyspark.ml.linalg.SparseVector
e definir o parâmetro do estimador enable_sparse_data_optim
como True
. Além disso, você precisa definir o parâmetro missing
como 0.0
.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU treinamento
Os estimadores do PySpark definidos no módulo xgboost.spark
suportam treinamento em GPUs. Defina o parâmetro use_gpu
como True
para ativar o treinamento da GPU.
Para cada tarefa do Spark usada no treinamento distribuído do XGBoost, apenas uma GPU é usada no treinamento quando o argumento use_gpu
é definido como True
. Databricks recomenda usar o valor default de 1
para a configuração de clustering Spark spark.task.resource.gpu.amount
. Caso contrário, as GPUs adicionais alocadas para essa Spark tarefa são paradas.
Por exemplo:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Solução de problemas
Durante o treinamento com vários nós, se o senhor encontrar uma mensagem NCCL failure: remote process exited or there was a network error
, isso normalmente indica um problema com a comunicação de rede entre as GPUs. Esse problema surge quando a NCCL (NVIDIA Collective Communications biblioteca) não pode usar determinadas interfaces de rede para comunicação com a GPU.
Para resolver, defina o sparkConf do clustering para spark.executorEnv.NCCL_SOCKET_IFNAME
como eth
. Isso basicamente define a variável de ambiente NCCL_SOCKET_IFNAME
como eth
para todos os trabalhadores em um nó.
Exemplo de notebook
Este Notebook mostra o uso do pacote Python xgboost.spark
com Spark MLlib.
PySpark-XGBoost Notebook
Guia de migração para o módulo obsoleto sparkdl.xgboost
- Substitua
from sparkdl.xgboost import XgboostRegressor
porfrom xgboost.spark import SparkXGBRegressor
e substituafrom sparkdl.xgboost import XgboostClassifier
porfrom xgboost.spark import SparkXGBClassifier
. - Altere todos os nomes de parâmetros no construtor do estimador do estilo camelCase para o estilo snake_case. Por exemplo, altere
XgboostRegressor(featuresCol=XXX)
paraSparkXGBRegressor(features_col=XXX)
. - Os parâmetros
use_external_storage
eexternal_storage_precision
foram removidos.xgboost.spark
Os estimadores usam a API de iteração de dados DMatrix para usar a memória com mais eficiência. Não é mais necessário usar o modo ineficiente de armazenamento externo. Para conjuntos de dados extremamente grandes, o site Databricks recomenda que o senhor aumente o parâmetronum_workers
, o que faz com que cada treinamento tarefa particione os dados em partições de dados menores e mais gerenciáveis. Considere a configuraçãonum_workers = sc.defaultParallelism
, que definenum_workers
como o número total de Spark slots de tarefa no clustering. - Para os estimadores definidos em
xgboost.spark
, a configuraçãonum_workers=1
executa o treinamento do modelo usando uma única tarefa do Spark. Isso utiliza o número de núcleos de CPU especificado pela configuração de clustering Sparkspark.task.cpus
, que é 1 por default. Para usar mais núcleos de CPU para treinar o modelo, aumentenum_workers
ouspark.task.cpus
. Você não pode definir o parâmetronthread
oun_jobs
para estimadores definidos emxgboost.spark
. Esse comportamento é diferente do comportamento anterior dos estimadores definidos no pacote obsoletosparkdl.xgboost
.
Converta o modelo sparkdl.xgboost
em modelo xgboost.spark
sparkdl.xgboost
os modelos são salvos em um formato diferente dos modelos xgboost.spark
e têm
configurações de parâmetros diferentes. Use a seguinte função
utilidades para converter o modelo:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
Se você tiver um modelo pyspark.ml.PipelineModel
contendo um modelo sparkdl.xgboost
como
último estágio, você pode substituir o estágio do modelo sparkdl.xgboost
por
o modelo xgboost.spark
convertido.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)