TorchDistributorによる分散トレーニング
この記事では、 TorchDistributor を使用して PyTorch ML モデルで分散トレーニングを実行する方法について説明します。
TorchDistributorは のオープンソースPySpark PyTorchSparkモジュールで、ユーザーが クラスターで を使用して分散トレーニングを行うのに役立つため、PyTorch トレーニング ジョブをSpark ジョブとして起動できます。内部的には、環境とワーカー間の通信チャネルを初期化し、 CLI コマンド torch.distributed.runを利用してワーカー ノード間で分散トレーニングを実行します。
TorchDistributor API は、次の表に示すメソッドをサポートしています。
| メソッドとシグネチャー | 説明 | 
|---|---|
| 
 | TorchDistributorのインスタンスを作成します。 | 
| 
 | 関数の場合は  | 
必要条件
- Spark 3.4
- Databricks Runtime 13.0 ML 以降
ノートブックの開発ワークフロー
モデルの作成とトレーニングのプロセスがすべてローカル コンピューター上のノートブックまたは Databricks ノートブックから行われる場合は、コードを分散トレーニングに備えるために小さな変更を加えるだけで済みます。
- 
単一ノードコードを準備します。 PyTorch、PyTorch Lightning、または PyTorch/PyTorch Lightning に基づくその他のフレームワーク (HuggingFace Trainer API など) を使用して、シングル ノード コードを準備してテストします。 
- 
標準の分散トレーニング用のコードを準備します。 シングルプロセストレーニングを分散トレーニングに変換する必要があります。この分散コードをすべて 1 つのトレーニング関数に含めて、 TorchDistributorで使用できます。
- 
トレーニング関数内にインポートを移動する: トレーニング関数内で import torchなどの必要なインポートを追加します。 そうすることで、一般的な酸洗いエラーを回避できます。 さらに、モデルとデータが関連付けられるdevice_idは、次の要素によって決まります。Pythondevice_id = int(os.environ["LOCAL_RANK"])
- 
分散トレーニングを開始する: 必要なパラメーターを使用して TorchDistributorをインスタンス化し、.run(*args)を呼び出してトレーニングを開始します。
トレーニング コードの例を次に示します。
from pyspark.ml.torch.distributor import TorchDistributor
def train(learning_rate, use_gpu):
  import torch
  import torch.distributed as dist
  import torch.nn.parallel.DistributedDataParallel as DDP
  from torch.utils.data import DistributedSampler, DataLoader
  backend = "nccl" if use_gpu else "gloo"
  dist.init_process_group(backend)
  device = int(os.environ["LOCAL_RANK"]) if use_gpu  else "cpu"
  model = DDP(createModel(), **kwargs)
  sampler = DistributedSampler(dataset)
  loader = DataLoader(dataset, sampler=sampler)
  output = train(model, loader, learning_rate)
  dist.cleanup()
  return output
distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train, 1e-3, True)
外部リポジトリからトレーニングを移行する
既存の分散トレーニング手順が外部リポジトリに保存されている場合は、次の手順で Databricks に簡単に移行できます。
- リポジトリをインポートします。 外部リポジトリを Databricks Git フォルダーとしてインポートします。
- 新しいノートブックを作成する リポジトリ内で新しい Databricks ノートブックを初期化します。
- 分散トレーニングの開始 ノートブックのセルでは、次のように TorchDistributorを呼び出します。
from pyspark.ml.torch.distributor import TorchDistributor
train_file = "/path/to/train.py"
args = ["--learning_rate=0.001", "--batch_size=16"]
distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(train_file, *args)
トラブルシューティング
ノートブックワークフローの一般的なエラーは、分散トレーニングの実行時にオブジェクトが見つからないか、ピクルス化されないことです。 これは、ライブラリのインポートステートメントが他のエグゼキューターに配布されていない場合に発生する可能性があります。
この問題を回避するには、すべてのインポートステートメント ( import torchなど) を、TorchDistributor(...).run(<func>) で呼び出されるトレーニング関数の先頭と、トレーニングメソッドで呼び出される他のユーザー定義関数の両方を含めます。
NCCL の失敗: ncclInternalError: Internal check failed.
マルチノード トレーニング中にこのエラーが発生した場合は、通常、GPU 間のネットワーク通信に問題があることを示しています。 この問題は、NCCL (NVIDIA Collective Communications ライブラリ) が特定のネットワーク インターフェイスを GPU 通信に使用できない場合に発生します。
このエラーを解決するには、トレーニング コードに次のスニペットを追加して、プライマリ ネットワーク インターフェイスを使用します。
import os
os.environ["NCCL_SOCKET_IFNAME"] = "eth0"
Glooの失敗: RuntimeError: Connection refused
このエラーは、GlooをCPUインスタンス上の分散トレーニングに使用しているときに発生する可能性があります。 このエラーを解決するには、トレーニング コードに次のスニペットを追加します。
import os
os.environ["GLOO_SOCKET_IFNAME"] = "eth0"
ノートブックの例
次のノートブックの例は、PyTorch を使用して分散トレーニングを実行する方法を示しています。