xgboost.sparkを使用したXGBoostモデルの分散トレーニング

プレビュー

この機能はパブリックプレビュー段階です。

Python パッケージ xgboost>=1.7 には、新しいモジュール xgboost.sparkが含まれています。 このモジュールには、xgboost PySpark 推定器 xgboost.spark.SparkXGBRegressorxgboost.spark.SparkXGBClassifier、および xgboost.spark.SparkXGBRankerが含まれています。 これらの新しいクラスは、SparkML パイプラインに XGBoost 推定器を含めることをサポートします。 API の詳細については、 XGBoost Python スパーク API ドキュメントを参照してください。

要件

Databricks Runtime 12.0 機械学習以上。

xgboost.spark パラメーター

xgboost.spark モジュールで定義されている推定器は、標準の XGBoost で使用されるのと同じパラメーターと引数のほとんどをサポートします。

  • クラス コンストラクター、 fit メソッド、および predict メソッドのパラメーターは、 xgboost.sklearn モジュールのパラメーターとほぼ同じです。

  • 名前付け、値、およびデフォルトは、 XGBoost パラメーターで説明されているものとほとんど同じです。

  • 例外は、サポートされていないいくつかのパラメーター ( gpu_idnthreadsample_weighteval_setなど) と、追加された pyspark 推定器固有のパラメーター ( featuresCollabelColuse_gpuvalidationIndicatorColなど) です。 詳細については、 XGBoost Python Spark API のドキュメントを参照してください。

分散トレーニング

xgboost.spark モジュールで定義されたPySpark推定子は、num_workers 引数を使用して分散XGBoostトレーニングをサポートします。 分散トレーニングを使用するには、分類器または回帰器を作成し、分散トレーニング中に並列実行されるSparkタスクの数を num_workers に設定します。 すべての Spark タスク スロットを使用するには、 num_workers=sc.defaultParallelismを設定します。

例:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)

  • 分散 XGBoost で mlflow.xgboost.autolog を使用することはできません。 MLflow を使用して xgboost Spark モデルをログに記録するには、 mlflow.spark.log_model(spark_xgb_model, artifact_path)を使用します。

  • 分散 XGBoost は、オートスケールが有効になっているクラスターでは使用できません。 このエラスティック・スケーリング・パラダイムで開始される新しいワーカー・ノードは、新しいタスク・セットを受け取ることができず、アイドル状態のままになります。 オートスケールを無効にする手順については、「 オートスケールを有効にする」を参照してください。

疎な特徴量データセットでのトレーニングの最適化の有効化

xgboost.spark モジュールで定義された PySpark 推定器は、スパース特徴を持つデータセットでのトレーニングの最適化をサポートします。スパース特徴セットの最適化を有効にするには、 pyspark.ml.linalg.SparseVector 型の値で構成される特徴列を含むデータセットを fit メソッドに提供し、推定器パラメーター enable_sparse_data_optimTrueに設定する必要があります。さらに、 missing パラメーターを 0.0に設定する必要があります。

例:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)

GPU トレーニング

xgboost.spark モジュールで定義されている PySpark 推定器は、GPU でのトレーニングをサポートします。パラメーター use_gpuTrue に設定して、GPU トレーニングを有効にします。

XGBoost 分散トレーニングで使用される各 Spark タスクについて、 use_gpu 引数が Trueに設定されている場合、トレーニングで使用される GPU は 1 つだけです。 Databricks では、Spark クラスター構成 spark.task.resource.gpu.amountに既定値の 1 を使用することをお勧めします。それ以外の場合、この Spark タスクに割り当てられた追加の GPU はアイドル状態になります。

例:

from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)

トラブルシューティング

マルチノード トレーニング中にNCCL failure: remote process exited or there was a network errorメッセージが表示された場合、通常は GPU 間のネットワーク通信に問題があることを示しています。 この問題は、NCCL (NVIDIA Collective Communications ライブラリ) が GPU 通信に特定のネットワーク インターフェイスを使用できない場合に発生します。

解決するには、クラスターの sparkConf をspark.executorEnv.NCCL_SOCKET_IFNAMEに対してethに設定します。 これは基本的に、ノード内のすべてのワーカーの環境変数NCCL_SOCKET_IFNAMEethに設定します。

ノートブックの例

このノートブックでは、Spark MLlib で xgboost.spark Python パッケージを使用する方法を示します。

PySpark-XGBoost ノートブック

ノートブックを新しいタブで開く

非推奨の sparkdl.xgboost モジュールの移行ガイド

  • from sparkdl.xgboost import XgboostRegressorfrom xgboost.spark import SparkXGBRegressor に置き換え、 from sparkdl.xgboost import XgboostClassifierfrom xgboost.spark import SparkXGBClassifierに置き換えます。

  • 推定器コンストラクターのすべてのパラメーター名を camelCase スタイルから snake_case スタイルに変更します。 たとえば、 XgboostRegressor(featuresCol=XXX)SparkXGBRegressor(features_col=XXX)に変更します。

  • use_external_storageexternal_storage_precisionは削除されました。 xgboost.spark推定子は、DMatrix データ反復 API を使用してメモリをより効率的に使用します。 非効率な外部ストレージモードを使用する必要はなくなりました。 非常に大きなデータセットの場合、 Databricks 、num_workers 引数を増やすことをお勧めします。これにより、各トレーニング タスクでデータがより小さく管理しやすいデータ パーティションに分割されます。 num_workers = sc.defaultParallelismを設定することを検討してください。これにより、 num_workersがクラスター内の Spark タスク スロットの合計数に設定されます。

  • xgboost.sparkで定義された推定器の場合、 num_workers=1を設定すると、単一の Spark タスクを使用してモデル トレーニングが実行されます。 これは、Spark クラスター構成設定spark.task.cpusで指定された CPU コアの数 (デフォルトでは 1) を利用します。 より多くの CPU コアを使用してモデルをトレーニングするには、 num_workersまたはspark.task.cpus増やします。 xgboost.sparkで定義された推定値に対してnthreadまたはn_jobsを設定することはできません。 この動作は、非推奨の sparkdl.xgboost パッケージで定義された推定器の以前の動作とは異なります。

sparkdl.xgboost モデルを xgboost.spark モデルに変換する

sparkdl.xgboost モデルは xgboost.spark モデルとは異なる形式で保存され、 パラメーター設定も異なります。 次のユーティリティ関数を使用して、モデルを変換します。

def convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls,
  sparkdl_xgboost_model,
):
  """
  :param xgboost_spark_estimator_cls:
      `xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
  :param sparkdl_xgboost_model:
      `sparkdl.xgboost` model instance e.g. the instance of
       `sparkdl.xgboost.XgboostRegressorModel` type.

  :return
      A `xgboost.spark` model instance
  """

  def convert_param_key(key):
    from xgboost.spark.core import _inverse_pyspark_param_alias_map
    if key == "baseMarginCol":
      return "base_margin_col"
    if key in _inverse_pyspark_param_alias_map:
      return _inverse_pyspark_param_alias_map[key]
    if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
      return None
    return key

  xgboost_spark_params_dict = {}
  for param in sparkdl_xgboost_model.params:
    if param.name == "arbitraryParamsDict":
      continue
    if sparkdl_xgboost_model.isDefined(param):
      xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)

  xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))

  xgboost_spark_params_dict = {
    convert_param_key(k): v
    for k, v in xgboost_spark_params_dict.items()
    if convert_param_key(k) is not None
  }

  booster = sparkdl_xgboost_model.get_booster()
  booster_bytes = booster.save_raw("json")
  booster_config = booster.save_config()
  estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
  sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
  return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))

# Example
from xgboost.spark import SparkXGBRegressor

new_model = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=model,
)

最後のステージとして sparkdl.xgboost モデルを含む pyspark.ml.PipelineModel モデルがある場合は、 sparkdl.xgboost モデルのステージを変換された xgboost.spark モデルに置き換えることができます。

pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
  xgboost_spark_estimator_cls=SparkXGBRegressor,
  sparkdl_xgboost_model=pipeline_model.stages[-1],
)