XGBoost モデルの分散トレーニング xgboost.spark
プレビュー
この機能は パブリック プレビュー段階です。
Python パッケージ xgboost>=1.7 には、新しいモジュール xgboost.spark
が含まれています。 このモジュールには、 xgboost PySpark 推定器 xgboost.spark.SparkXGBRegressor
、 xgboost.spark.SparkXGBClassifier
、および xgboost.spark.SparkXGBRanker
が含まれています。 これらの新しいクラスは、SparkML パイプラインに XGBoost 推定器を含めることをサポートしています。 API詳細については、XGBoost Python Spark API のドキュメントを参照してください。
必要条件
Databricks Runtime 12.0 ML 以降。
xgboost.spark
パラメーター
xgboost.spark
モジュールで定義された推定器は、標準 XGBoostで使用されるのと同じパラメーターと引数のほとんどをサポートします。
- クラス コンストラクター、
fit
メソッド、およびpredict
メソッドのパラメーターは、xgboost.sklearn
モジュールのパラメーターとほぼ同じです。 - 名前付け、値、およびデフォルトは、 XGBoost パラメーターで説明されているものとほぼ同じです。
- 例外は、サポートされていないいくつかのパラメーター (
gpu_id
、nthread
、sample_weight
、eval_set
など) と、追加されたpyspark
推定器固有のパラメーター (featuresCol
、labelCol
、use_gpu
、validationIndicatorCol
など) です。 詳細については、 XGBoost Python Spark API のドキュメントを参照してください。
分散トレーニング
xgboost.spark
モジュールで定義されているPySpark推定器は、num_workers
パラメーターを使用した分散XGBoostトレーニングをサポートします。分散トレーニングを使用するには、分類子またはリグレッサーを作成し、分散トレーニング中にnum_workers
を並列実行中の Spark タスクの数に設定します。 すべての Spark タスク スロットを使用するには、 num_workers=sc.defaultParallelism
を設定します。
例えば:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
mlflow.xgboost.autolog
は分散 XGBoost では使用できません。MLflow を使用して xgboost Spark モデルをログに記録するには、mlflow.spark.log_model(spark_xgb_model, artifact_path)
を使用します。- オートスケールが有効になっているクラスターでは、分散 XGBoost を使用することはできません。 このエラスティック スケーリング パラダイムで開始する新しいワーカー ノードは、新しいタスク セットを受け取ることができず、アイドル状態のままになります。 オートスケールを無効にする手順については、「 オートスケールを有効にする」を参照してください。
スパース特徴データセットでのトレーニングの最適化を有効にする
xgboost.spark
モジュールで定義されている PySpark Estimator は、スパース特徴を持つデータセットでのトレーニングの最適化をサポートします。スパース特徴セットの最適化を有効にするには、pyspark.ml.linalg.SparseVector
型の値で構成される features 列を含むデータセットを fit
メソッドに提供し、推定器パラメーター の enable_sparse_data_optim
を True
に設定する必要があります。さらに、 missing
パラメーターを 0.0
に設定する必要があります。
例えば:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU トレーニング
xgboost.spark
モジュールで定義されているPySpark推定器は、GPUでのトレーニングをサポートします。パラメーター use_gpu
を True
に設定して、GPU トレーニングを有効にします。
XGBoost 分散トレーニングで使用される Spark タスクごとに、 use_gpu
引数が True
に設定されている場合、トレーニングで使用される GPU は 1 つだけです。 Databricks 、 Spark クラスター構成spark.task.resource.gpu.amount
には、デフォルトの値 1
を使用することをお勧めします。 それ以外の場合、この Spark タスクに割り当てられた追加の GPU はアイドル状態になります。
例えば:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
トラブルシューティング
マルチノード トレーニング中に NCCL failure: remote process exited or there was a network error
メッセージが表示された場合、通常は GPU 間のネットワーク通信に問題があることを示しています。 この問題は、NCCL (NVIDIA Collective Communications ライブラリ) が特定のネットワーク インターフェイスを GPU 通信に使用できない場合に発生します。
解決するには、クラスターの sparkConf for spark.executorEnv.NCCL_SOCKET_IFNAME
を eth
に設定します。 これにより、基本的に、ノード内のすべてのワーカーの環境変数 THE NCCL_SOCKET_IFNAME
が eth
に設定されます。
ノートブックの例
このノートブックでは、Spark MLlib で xgboost.spark
Python パッケージを使用する方法を示します。
PySpark-XGBoost ノートブック
非推奨の sparkdl.xgboost
モジュールの移行ガイド
from sparkdl.xgboost import XgboostRegressor
をfrom xgboost.spark import SparkXGBRegressor
に置き換え、from sparkdl.xgboost import XgboostClassifier
をfrom xgboost.spark import SparkXGBClassifier
に置き換えます。- 推定器コンストラクター内のすべてのパラメーター名をキャメルケース形式からsnake_caseスタイルに変更します。 たとえば、
XgboostRegressor(featuresCol=XXX)
をSparkXGBRegressor(features_col=XXX)
に変更します。 - パラメーター
use_external_storage
とexternal_storage_precision
は削除されました。xgboost.spark
推定器は、DMatrix データ反復 API を使用してメモリをより効率的に使用します。 非効率的な外部ストレージモードを使用する必要はもうありません。 非常に大きなデータセットの場合、Databricks では、num_workers
パラメーターを増やして、各トレーニング タスクがデータをより小さく、管理しやすいデータ パーティションに分割することをお勧めします。 クラスター内の Spark タスクスロットの合計数にnum_workers
を設定するnum_workers = sc.defaultParallelism
の設定を検討します。 xgboost.spark
で定義されている推定器の場合、num_workers=1
を設定すると、1 つの Spark タスクを使用してモデルトレーニングが実行されます。これは、 Spark クラスター構成設定spark.task.cpus
で指定された CPU コアの数 (デフォルトによる 1) を利用します。 モデルのトレーニングに使用する CPU コアを増やすには、num_workers
またはspark.task.cpus
を増やします。xgboost.spark
で定義されている推定器のnthread
パラメーターまたはn_jobs
パラメーターは設定できません。この動作は、非推奨のsparkdl.xgboost
パッケージで定義された推定器の以前の動作とは異なります。
sparkdl.xgboost
モデルをxgboost.spark
モデルに変換する
sparkdl.xgboost
モデルは xgboost.spark
モデルとは異なる形式で保存され、
異なるパラメーター設定。 以下を使用します
モデルを変換するためのユーティリティ関数:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
sparkdl.xgboost
モデルを含む pyspark.ml.PipelineModel
モデルがあれば
最後のステージでは、sparkdl.xgboost
modelのステージを次のように置き換えることができます
変換された xgboost.spark
モデル。
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)