Carga de trabalho com múltiplas GPUs e múltiplos nós
Você pode executar cargas de trabalho distribuídas em várias GPUs — seja em um único nó ou em vários nós. múltiplos nós -- usando a API Python para GPU sem servidor. A API fornece uma interface simples e unificada que abstrai os detalhes do provisionamento de GPUs. Configuração do ambiente e distribuição da carga de trabalho. Com alterações mínimas no código, você pode migrar sem problemas. Desde o treinamento em uma única GPU até a execução distribuída em GPUs remotas a partir do mesmo notebook.
O treinamento distribuído com múltiplas GPUs é suportado tanto em H100s quanto em A10s. Treinamento distribuído multinó Só é compatível com GPUs A10.
Comece rápido
A API de GPU serverless para o ambiente distribuído está pré-instalada no compute de GPU serverless ambientes para Databricks Notebook. Recomendamos o ambiente de GPU 4 ou superior. Para utilizá-lo em treinamento distribuído, importe e utilize o
distributed Decorador para distribuir sua função de treinamento.
O trecho de código abaixo mostra o uso básico de @distributed:
# Import the distributed decorator
from serverless_gpu import distributed
# Decorate your training function with @distributed and specify the number of GPUs, the GPU type,
# and whether or not the GPUs are remote
@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train():
...
Abaixo está um exemplo completo que ensina um modelo de perceptron multicamadas (MLP) em 8 nós de GPU A10 a partir de um Caderno:
-
Configure seu modelo e defina as funções de utilidade.
Python
# Define the model
import os
import torch
import torch.distributed as dist
import torch.nn as nn
def setup():
dist.init_process_group("nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
def cleanup():
dist.destroy_process_group()
class SimpleMLP(nn.Module):
def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.net(x) -
Importe a biblioteca serverless_gpu e o módulo distribuído .
Pythonimport serverless_gpu
from serverless_gpu import distributed -
Envolva o código de treinamento do modelo em uma função e decore a função com o decorador
@distributed.Python@distributed(gpus=8, gpu_type='A10', remote=True)
def run_train(num_epochs: int, batch_size: int) -> None:
import mlflow
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
# 1. Set up multi node environment
setup()
device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
# 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
model = SimpleMLP().to(device)
model = DDP(model, device_ids=[device])
# 3. Create and load dataset.
x = torch.randn(5000, 10)
y = torch.randn(5000, 1)
dataset = TensorDataset(x, y)
sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
# 4. Define the training loop.
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
model.train()
total_loss = 0.0
for step, (xb, yb) in enumerate(dataloader):
xb, yb = xb.to(device), yb.to(device)
optimizer.zero_grad()
loss = loss_fn(model(xb), yb)
# Log loss to MLflow metric
mlflow.log_metric("loss", loss.item(), step=step)
loss.backward()
optimizer.step()
total_loss += loss.item() * xb.size(0)
mlflow.log_metric("total_loss", total_loss)
print(f"Total loss for epoch {epoch}: {total_loss}")
cleanup() -
Execute o treinamento distribuído chamando a função distribuída com argumentos definidos pelo usuário.
Pythonrun_train.distributed(num_epochs=3, batch_size=1) -
Ao ser executado, um link de execução MLflow será gerado na saída da célula do Notebook. Clique no link de execução do MLflow ou encontre-o no painel de Experimentos para ver os resultados da execução.

Detalhes da execução distribuída
API de GPU sem servidor consiste em vários componentes key :
- Gerenciador de computação: Gerencia a alocação e o gerenciamento de recursos.
- Ambiente Runtime : gerenciar ambientes e dependências Python
- Iniciador: Orquestra a execução e o monitoramento de tarefas.
Ao executar em modo distribuído:
- A função é serializada e distribuída entre o número especificado de GPUs.
- Cada execução na GPU gera uma cópia da função com os mesmos parâmetros.
- O ambiente está sincronizado em todos os nós.
- Os resultados são coletados e retornados de todas as GPUs.
Se remote for definido como True, a carga de trabalho será distribuída nas GPUs remotas. Se remote estiver definido como
False, a carga de trabalho está sendo executada no único nó de GPU conectado pelo Notebook atual. Se o
O nó possui vários chips de GPU, e todos eles serão utilizados.
A API suporta bibliotecas populares de processamento paralelo, como Distributed Data Parallel (DDP). Dados Paralelos Totalmente Fragmentados (FSDP), DeepSpeed e Ray.
Você pode encontrar mais cenários reais de treinamento distribuído usando as várias bibliotecas nos exemplos de notebooks.
Lançamento com Ray
A API de GPU serverless também suporta o lançamento de treinamento distribuído usando Ray usando o @ray_launch
decorador, que é sobreposto a @distributed.
Cada tarefa ray_launch primeiro inicializa um encontro distribuído pelo tocha para decidir o worker principal do Ray.
e coletar endereços IP. Rank-zero começar ray start --head (com exportação de métricas se habilitado), conjuntos
RAY_ADDRESS e execução sua função decorada como o driver Ray. Outros nós join através de
ray start --address e aguarde até que o driver escreva um marcador de conclusão.
Detalhes adicionais de configuração:
- Para habilitar a coleta de métricas do sistema Ray em cada nó, use
RayMetricsMonitorcomremote=True. - Defina as opções de tempo de execução do Ray (atores, conjunto de dados, grupos de posicionamento e programador) dentro do seu Função decorada usando APIs Ray padrão.
- gerenciar controles em todo cluster(contagem e tipo de GPU, modo remoto vs. local, comportamento assíncrono e Databricks pool variável de ambiente) fora da função nos argumentos do decorador ou Ambiente de notebook.
O exemplo abaixo mostra como usar @ray_launch:
from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='A10')
def foo():
import os
import ray
print(ray.state.available_resources_per_node())
return 1
foo.distributed()
Para um exemplo completo, veja este Notebook. que inicia o Ray para treinar uma rede neural Resnet18 em múltiplas GPUs A10.
Perguntas frequentes
Onde deve ser colocado o código de carregamento de dados?
Ao usar a APIde GPU sem servidor Para treinamento distribuído, mova o código de carregamento de dados para dentro do decorador @distributed . O dataset O tamanho pode exceder o tamanho máximo permitido pelo pickle, portanto, recomenda-se gerar o dataset dentro do decorator, conforme mostrado abaixo:
from serverless_gpu import distributed
# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, remote=True)
def run_train():
# good practice
dataset = get_dataset(file_path)
....
Como usar pool reservada?
Se houver pool de GPUs reservado disponível (verifique com o administrador) em seu workspace e você especificar
No decorador `@distributed`, de remote para True , a carga de trabalho será executada na GPU reservada.
pool por default. Se você deseja usar o pool de GPUs sob demanda, defina a variável de ambiente.
DATABRICKS_USE_RESERVED_GPU_POOL a True antes de chamar a função distribuída.
Saber mais
Para obter informações sobre a API , consulte a documentação API Python para GPUs sem servidor .