メインコンテンツまでスキップ

Ray Train での分散トレーニング

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

この例では、単一ノードで8基のH100 GPU全体にわたり、Ray TrainTorchTrainer を使用して分散データ並列ファインチューニングを実行します。ブートストラップスクリプトがノード上でRayクラスターを起動すると、Ray Train driverはGPUごとに1つのワーカーを起動し、モデルをDDPでラップし、データセットをワーカー全体に自動的にシャード化します。

公開モデル(Qwen2.5-3B)をファインチューニングします。Hugging Faceトークンがなくても、そのまま実行できます。

ワークロードでは、次の処理を行います:

  • ローカルプロジェクトを code_source: snapshot でアップロードします。
  • すべての8個のGPUを使用してRayヘッドを起動し、Rayトレーニングドライバーを実行します。
  • ray.train.torch.prepare_modelprepare_data_loaderを使用して、DDPラップ、 デバイス配置、および分散サンプリングを処理します。
  • MLflow にメトリクスを記録します。

前提条件

プロジェクト レイアウト

次のファイルを含むディレクトリを作成します。

Text
ray_train_distributed/
├── train.yaml # air workload config (inline dependencies + Ray bootstrap)
└── train_ray.py # Ray Train TorchTrainer driver + per-worker training

ステップ 1: ワークロード YAML を記述する

train.yaml 単一のGPU_8xH100ノードを必要とします。依存関係はenvironmentの下にインラインで宣言され(クライアントイメージversionとともに)、commandはノード上でRayクラスターを起動し、ドライバーを実行するため、ワークロードは個別のrequirements.yamlやランチャースクリプトを必要としません:

YAML
experiment_name: air-ray-train-distributed

environment:
version: '4'
dependencies:
- ray[default,train]>=2.30
- transformers>=4.45
- datasets>=3.0
- huggingface_hub>=0.34
# The base image ships fsspec 2023.5.0, which is too old for modern
# huggingface_hub and breaks dataset/model downloads. Pin a newer fsspec.
- fsspec>=2024.6.1

# 8 H100 on a single node. Ray Train launches one worker per GPU.
compute:
num_accelerators: 8
accelerator_type: GPU_8xH100

code_source:
type: snapshot
snapshot:
root_path: .

command: |
cd $CODE_SOURCE_PATH
RAY_HEAD_PORT=6379
GPUS_PER_NODE=${LOCAL_WORLD_SIZE:-8}
if [ "${NODE_RANK:-0}" = "0" ]; then
echo "NODE_RANK=0: starting Ray head with $GPUS_PER_NODE GPU(s)..."
ray start --head --port=$RAY_HEAD_PORT --num-gpus="$GPUS_PER_NODE" --dashboard-host=0.0.0.0
python train_ray.py
ray stop
else
echo "NODE_RANK=$NODE_RANK: connecting to Ray head at $MASTER_ADDR:$RAY_HEAD_PORT..."
for i in $(seq 1 12); do
if ray start --address="$MASTER_ADDR:$RAY_HEAD_PORT" --num-gpus="$GPUS_PER_NODE" --block 2>/dev/null; then
break
fi
echo "Attempt $i failed, retrying in 5s..."
sleep 5
done
fi

max_retries: 0
timeout_minutes: 90
env_variables:
NCCL_SOCKET_IFNAME: eth0

インラインcommandは、ノード上のすべてのGPUでRayヘッドを起動し、python train_ray.pyでドライバーを実行した後、クラスターを停止します。ヘッドに結合するワーカーブランチも含まれているため、ジョブを複数のノードにスケールしても、同じコマンドが機能し続けます。

ステップ2:Ray Train ドライバーを定義する

train_ray.py すべてのワーカーで実行される train_func と、クラスター内のすべての GPU を使用するように TorchTrainer を構成する main を定義します。prepare_model はモデルをDDPでラップし、ワーカーのGPUに移動します。prepare_data_loader は分散サンプラーを追加します。

Python
def train_func(config: dict):
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
model.config.use_cache = False
model = prepare_model(model) # DDP wrap + device placement

loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
loader = prepare_data_loader(loader) # distributed sampler + GPU transfer
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
...
ray.train.report({"loss": out.loss.item(), "step": step})


def main():
ray.init(address="auto")
total_gpus = int(ray.cluster_resources().get("GPU", 0))
trainer = TorchTrainer(
train_func,
train_loop_config={"lr": 2e-5, "batch_size": 4, "max_steps": 100},
scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
)
trainer.fit()

完全なスクリプトは、このページの最後にある「完全なトレーニング用スクリプト」に記載されています。

ステップ3:実行を送信する

Bash
air run -f train.yaml --dry-run
air run -f train.yaml --watch

ステップ 4: 実行を確認する

Bash
air get run <run-id>
air logs <run-id>

Ray ヘッドとドライバーはどちらもノード 0 で実行されるため、ログは 1 つのノードからストリームされます。

結果の保存場所

ray.train.reportで報告され、MLflowでログに記録されたメトリクスは、experiment_nameで指定されたMLflowエクスペリメントに表示され、ワークスペースのMLflow UIで確認できます。

完全なトレーニングスクリプト

コピー/貼り付けの完全なtrain_ray.py

Python
#!/usr/bin/env python3
"""Distributed data-parallel fine-tuning with Ray Train on a single 8x H100 node.

The workload `command` starts a Ray head with 8 GPUs and runs this script. Ray Train's
TorchTrainer launches one worker per GPU (8 total), wraps the model in DDP, shards
the dataset across workers, and aggregates metrics. Each worker runs `train_func`.

Uses a public model (no Hugging Face token required) so the example runs as-is.
"""

import os

import mlflow
import ray
import ray.train
import torch
from datasets import load_dataset
from ray.train import RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer, prepare_data_loader, prepare_model
from torch.utils.data import DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer

MODEL_NAME = "Qwen/Qwen2.5-3B"
DATASET_NAME = "tatsu-lab/alpaca"
MAX_SEQ_LEN = 1024


def build_dataset(tokenizer):
raw = load_dataset(DATASET_NAME, split="train[:8000]")

def format_example(row):
prompt = f"### Instruction:\n{row['instruction']}\n\n"
if row.get("input"):
prompt += f"### Input:\n{row['input']}\n\n"
text = f"{prompt}### Response:\n{row['output']}{tokenizer.eos_token}"
out = tokenizer(text, truncation=True, max_length=MAX_SEQ_LEN, padding="max_length")
out["labels"] = out["input_ids"].copy()
return out

return raw.map(format_example, remove_columns=raw.column_names)


def train_func(config: dict):
"""Runs on every Ray Train worker (one per GPU)."""
rank = ray.train.get_context().get_world_rank()

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
model.config.use_cache = False
# prepare_model moves the model to this worker's GPU and wraps it in DDP.
model = prepare_model(model)

dataset = build_dataset(tokenizer).with_format("torch")
loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
# prepare_data_loader injects a DistributedSampler and moves batches to the GPU.
loader = prepare_data_loader(loader)

optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])

# AI Runtime injects MLFLOW_RUN_ID and configures the databricks tracking URI on
# the node, so logging works without DATABRICKS_HOST/TOKEN. Gate on MLFLOW_RUN_ID
# so the script also runs cleanly off-platform (e.g. locally) where it is unset.
use_mlflow = rank == 0 and bool(os.environ.get("MLFLOW_RUN_ID"))
if use_mlflow:
mlflow.start_run(run_id=os.environ.get("MLFLOW_RUN_ID"))
mlflow.log_params({"model": MODEL_NAME, "lr": config["lr"], "batch_size": config["batch_size"]})

model.train()
step = 0
for batch in loader:
out = model(
input_ids=batch["input_ids"],
attention_mask=batch["attention_mask"],
labels=batch["labels"],
)
out.loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
step += 1

ray.train.report({"loss": out.loss.item(), "step": step})
if use_mlflow:
mlflow.log_metric("train_loss", out.loss.item(), step=step)
if step >= config["max_steps"]:
break

if use_mlflow:
mlflow.end_run()


def main():
ray.init(address="auto")
total_gpus = int(ray.cluster_resources().get("GPU", 0))
print(f"Ray cluster ready: {total_gpus} GPU(s)", flush=True)

trainer = TorchTrainer(
train_func,
train_loop_config={&quot;lr&quot;: 2e-5, &quot;batch_size&quot;: 4, &quot;max_steps&quot;: 100},
scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
run_config=RunConfig(storage_path="/tmp/ray_results", name="qwen-sft"),
)
result = trainer.fit()
print(f"Training finished. Final metrics: {result.metrics}", flush=True)

ray.shutdown()


if __name__ == "__main__":
main()

次のステップ