Pular para o conteúdo principal

Treinamento distribuído com Ray Train

info

Beta

Este recurso está em Beta. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Pré-visualizações . Consulte Gerenciar prévias do Databricks.

Este exemplo executa o ajuste fino distribuído e paralelo a dados com o TorchTrainer do Ray Train em 8 GPUs H100 em um único nó. Um script de bootstrap inicia um cluster Ray no nó, em seguida, o driver Ray Train lança um worker por GPU, encapsula o modelo em DDP e fragmenta o dataset automaticamente entre os workers.

Ele ajusta um modelo público (Qwen2.5-3B), Então, ele é executado como está sem um token do Hugging Face.

A carga de trabalho executa as seguintes ações:

  • Faz upload do projeto local com code_source: snapshot.
  • Começa um Ray head com todas as 8 GPUs, depois executa o driver do Ray Train.
  • Usa ray.train.torch.prepare_model e prepare_data_loader para lidar com o empacotamento DDP, o posicionamento de dispositivos e a amostragem distribuída.
  • Registra métricas no MLflow.

Pré-requisitos

Disposição do projeto

Criar um diretório com os seguintes arquivos.

Text
ray_train_distributed/
├── train.yaml # air workload config (inline dependencies + Ray bootstrap)
└── train_ray.py # Ray Train TorchTrainer driver + per-worker training

O passo 1: Escreva a carga de trabalho YAML

train.yaml solicita um nó único de GPU_8xH100. As dependências são declaradas embutidas em environment (com a imagem do cliente version), e o command inicia um cluster Ray no nó e executa o driver, portanto, a carga de trabalho não precisa de requirements.yaml ou script de inicialização separado:

YAML
experiment_name: air-ray-train-distributed

environment:
version: '4'
dependencies:
- ray[default,train]>=2.30
- transformers>=4.45
- datasets>=3.0
- huggingface_hub>=0.34
# The base image ships fsspec 2023.5.0, which is too old for modern
# huggingface_hub and breaks dataset/model downloads. Pin a newer fsspec.
- fsspec>=2024.6.1

# 8 H100 on a single node. Ray Train launches one worker per GPU.
compute:
num_accelerators: 8
accelerator_type: GPU_8xH100

code_source:
type: snapshot
snapshot:
root_path: .

command: |
cd $CODE_SOURCE_PATH
RAY_HEAD_PORT=6379
GPUS_PER_NODE=${LOCAL_WORLD_SIZE:-8}
if [ "${NODE_RANK:-0}" = "0" ]; then
echo "NODE_RANK=0: starting Ray head with $GPUS_PER_NODE GPU(s)..."
ray start --head --port=$RAY_HEAD_PORT --num-gpus="$GPUS_PER_NODE" --dashboard-host=0.0.0.0
python train_ray.py
ray stop
else
echo "NODE_RANK=$NODE_RANK: connecting to Ray head at $MASTER_ADDR:$RAY_HEAD_PORT..."
for i in $(seq 1 12); do
if ray start --address="$MASTER_ADDR:$RAY_HEAD_PORT" --num-gpus="$GPUS_PER_NODE" --block 2>/dev/null; then
break
fi
echo "Attempt $i failed, retrying in 5s..."
sleep 5
done
fi

max_retries: 0
timeout_minutes: 90
env_variables:
NCCL_SOCKET_IFNAME: eth0

O command em linha inicia um head do Ray com todas as GPUs no nó, executa o driver com python train_ray.py e para o cluster. Ele também inclui um branch worker que se join à cabeça, assim o mesmo comando continua funcionando se o Job for escalado para múltiplos nós.

Passo 2: definir o driver Ray ensinar

train_ray.py define um train_func que é execução em cada worker e um main que configura o TorchTrainer para usar todas as GPUs no clusters. prepare_model empacota o modelo em DDP e o move para a GPU do worker. prepare_data_loader adiciona um amostrador distribuído:

Python
def train_func(config: dict):
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
model.config.use_cache = False
model = prepare_model(model) # DDP wrap + device placement

loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
loader = prepare_data_loader(loader) # distributed sampler + GPU transfer
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])
...
ray.train.report({"loss": out.loss.item(), "step": step})


def main():
ray.init(address="auto")
total_gpus = int(ray.cluster_resources().get("GPU", 0))
trainer = TorchTrainer(
train_func,
train_loop_config={"lr": 2e-5, "batch_size": 4, "max_steps": 100},
scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
)
trainer.fit()

O script completo está listado em Script de treinamento completo no final desta página.

Passo 3: Enviar a execução

Bash
air run -f train.yaml --dry-run
air run -f train.yaml --watch

o passo 4: inspeção da execução

Bash
air get run <run-id>
air logs <run-id>

A cabeça do Ray e o driver são executados no nó 0, assim os logs são transmitidos de um único nó.

Onde os resultados são armazenados

Métricas relatadas com ray.train.report e registradas com o MLflow aparecem no experimento MLflow nomeado em experiment_name, visível na UI do MLflow do workspace.

Script de treinamento completo

O train_ray.py completo para copiar e colar:

Python
#!/usr/bin/env python3
"""Distributed data-parallel fine-tuning with Ray Train on a single 8x H100 node.

The workload `command` starts a Ray head with 8 GPUs and runs this script. Ray Train's
TorchTrainer launches one worker per GPU (8 total), wraps the model in DDP, shards
the dataset across workers, and aggregates metrics. Each worker runs `train_func`.

Uses a public model (no Hugging Face token required) so the example runs as-is.
"""

import os

import mlflow
import ray
import ray.train
import torch
from datasets import load_dataset
from ray.train import RunConfig, ScalingConfig
from ray.train.torch import TorchTrainer, prepare_data_loader, prepare_model
from torch.utils.data import DataLoader
from transformers import AutoModelForCausalLM, AutoTokenizer

MODEL_NAME = "Qwen/Qwen2.5-3B"
DATASET_NAME = "tatsu-lab/alpaca"
MAX_SEQ_LEN = 1024


def build_dataset(tokenizer):
raw = load_dataset(DATASET_NAME, split="train[:8000]")

def format_example(row):
prompt = f"### Instruction:\n{row['instruction']}\n\n"
if row.get("input"):
prompt += f"### Input:\n{row['input']}\n\n"
text = f"{prompt}### Response:\n{row['output']}{tokenizer.eos_token}"
out = tokenizer(text, truncation=True, max_length=MAX_SEQ_LEN, padding="max_length")
out["labels"] = out["input_ids"].copy()
return out

return raw.map(format_example, remove_columns=raw.column_names)


def train_func(config: dict):
"""Runs on every Ray Train worker (one per GPU)."""
rank = ray.train.get_context().get_world_rank()

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(MODEL_NAME, torch_dtype=torch.bfloat16)
model.config.use_cache = False
# prepare_model moves the model to this worker's GPU and wraps it in DDP.
model = prepare_model(model)

dataset = build_dataset(tokenizer).with_format("torch")
loader = DataLoader(dataset, batch_size=config["batch_size"], shuffle=True, drop_last=True)
# prepare_data_loader injects a DistributedSampler and moves batches to the GPU.
loader = prepare_data_loader(loader)

optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"])

# AI Runtime injects MLFLOW_RUN_ID and configures the databricks tracking URI on
# the node, so logging works without DATABRICKS_HOST/TOKEN. Gate on MLFLOW_RUN_ID
# so the script also runs cleanly off-platform (e.g. locally) where it is unset.
use_mlflow = rank == 0 and bool(os.environ.get("MLFLOW_RUN_ID"))
if use_mlflow:
mlflow.start_run(run_id=os.environ.get("MLFLOW_RUN_ID"))
mlflow.log_params({"model": MODEL_NAME, "lr": config["lr"], "batch_size": config["batch_size"]})

model.train()
step = 0
for batch in loader:
out = model(
input_ids=batch["input_ids"],
attention_mask=batch["attention_mask"],
labels=batch["labels"],
)
out.loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
optimizer.zero_grad()
step += 1

ray.train.report({"loss": out.loss.item(), "step": step})
if use_mlflow:
mlflow.log_metric("train_loss", out.loss.item(), step=step)
if step >= config["max_steps"]:
break

if use_mlflow:
mlflow.end_run()


def main():
ray.init(address="auto")
total_gpus = int(ray.cluster_resources().get("GPU", 0))
print(f"Ray cluster ready: {total_gpus} GPU(s)", flush=True)

trainer = TorchTrainer(
train_func,
train_loop_config={&quot;lr&quot;: 2e-5, &quot;batch_size&quot;: 4, &quot;max_steps&quot;: 100},
scaling_config=ScalingConfig(num_workers=total_gpus, use_gpu=True),
run_config=RunConfig(storage_path="/tmp/ray_results", name="qwen-sft"),
)
result = trainer.fit()
print(f"Training finished. Final metrics: {result.metrics}", flush=True)

ray.shutdown()


if __name__ == "__main__":
main()

Passos seguintes