TorchDistributorを使った分散トレーニング

この記事では、 TorchDistributor を使用して PyTorch 機械学習モデルで分散トレーニングを実行する方法について説明します。

TorchDistributor は PySpark のオープンソース モジュールであり、ユーザーが Spark クラスターで PyTorch を使用して分散トレーニングを行うのに役立つため、PyTorch トレーニング ジョブを Spark ジョブとして起動できます。 内部的には、ワーカー間の環境と通信チャネルを初期化し、CLIコマンド torch.distributed.run を利用してワーカーノード間で分散トレーニングを実行します。

TorchDistributor API は、次の表に示すメソッドをサポートしています。

メソッドとシグネチャー

説明

init(self, num_processes, local_mode, use_gpu)

TorchDistributorのインスタンスを作成します。

run(self, main, *args)

main が関数の場合は main(**kwargs) を呼び出して分散トレーニングを実行し、main がファイル パスの場合は CLI コマンド torchrun main *args 実行します。

要件

  • Spark 3.4

  • Databricks Runtime 13.0 機械学習以上

ノートブックの開発ワークフロー

モデルの作成とトレーニング プロセスが完全にローカル コンピューター上のノートブックまたは Databricks ノートブックから行われる場合は、小さな変更を加えるだけで、コードを分散トレーニング用に準備できます。

  1. 単一ノード コードを準備します。 PyTorch、PyTorch Lightning、または HuggingFace Trainer API のような PyTorch/PyTorch Lightning に基づくその他のフレームワークを使用して、単一ノード コードを準備してテストします。

  2. 標準の分散トレーニング用のコードを準備します。 単一プロセスのトレーニングを分散トレーニングに変換する必要があります。この分散コードをすべて、 TorchDistributorで使用できる 1 つのトレーニング関数に含めます。

  3. トレーニング関数内にインポートを移動する: トレーニング関数内で import torchなどの必要なインポートを追加します。 そうすることで、一般的な酸洗いエラーを回避できます。 さらに、モデルとデータが関連付けられる device_id は、次の要素によって決まります。

    device_id = int(os.environ["LOCAL_RANK"])
    
  4. 分散トレーニングの開始: 目的のパラメーターを使用して TorchDistributor をインスタンス化し、 .run(*args) を呼び出してトレーニングを開始します。

トレーニング コード例を次に示します。

from pyspark.ml.torch.distributor import TorchDistributor

def train(learning_rate, use_gpu):
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader

  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend)
  device = int(os.environ["LOCAL_RANK"]) if use_gpu  else "cpu"
  model = DDP(createModel(), **kwargs)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler)

  output = train(model, loader, learning_rate)
  dist.cleanup()
  return output

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)

外部リポジトリからのトレーニングの移行

外部リポジトリに格納されている既存の分散トレーニング手順がある場合は、次の手順を実行して Databricks に簡単に移行できます。

  1. リポジトリをインポートする:外部リポジトリをDatabricks Git フォルダーとしてインポートします。

  2. 新しいノートブックを作成する リポジトリ内の新しい Databricks ノートブックを初期化します。

  3. 分散トレーニングの開始 ノートブックのセルで、次のように TorchDistributor 呼び出します。

from pyspark.ml.torch.distributor import TorchDistributor

train_file = "/path/to/train.py"
args = ["--learning_rate=0.001", "--batch_size=16"]
distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train_file, *args)

トラブルシューティング

ノートブック ワークフローの一般的なエラーは、分散トレーニングの実行時にオブジェクトが見つからないか、ピッキングできないことです。 これは、ライブラリー・インポート・ステートメントが他のエグゼキューターに配布されていない場合に発生する可能性があります。

この問題を回避するには、 TorchDistributor(...).run(<func>) で呼び出されるトレーニング関数の先頭と、トレーニング メソッドで呼び出される他のユーザー定義関数の内部の両方に 、すべての import ステートメント ( import torchなど) を含めます。

CUDA 失敗: peer access is not supported between these two devices

これは、AWS 上の G5 GPU スイートの潜在的なエラーです。 このエラーを解決するには、トレーニング コードに次のスニペットを追加します。

import os
os.environ["NCCL_P2P_DISABLE"] = "1"

NCCL障害:ncclInternalError: Internal check failed.

マルチノード トレーニング中にこのエラーが発生した場合、通常は GPU 間のネットワーク通信に問題があることを示しています。 この問題は、NCCL (NVIDIA Collective Communications ライブラリ) が GPU 通信に特定のネットワーク インターフェイスを使用できない場合に発生します。

このエラーを解決するには、トレーニング コードに次のスニペットを追加して、プライマリ ネットワーク インターフェースを使用します。

import os
os.environ["NCCL_SOCKET_IFNAME"] = "eth0"

ノートブックの例

次のノートブックの例では、PyTorch を使用して分散トレーニングを実行する方法を示します。

Databricksノートブックでのエンドツーエンドの分散トレーニング

ノートブックを新しいタブで開く

Hugging Face モデルノートブックの分散ファインチューニング

ノートブックを新しいタブで開く

PyTorch ファイルノートブックでの分散トレーニング

ノートブックを新しいタブで開く

PyTorch Lightningノートブックを使用した分散トレーニング

ノートブックを新しいタブで開く