ensinar um modelo de detecção de imagens RetinaNet
Este Notebook demonstra como ensinar um modelo de detecção de objetos RetinaNet do zero usando PyTorch e torchvision em compute GPU serverless Databricks . RetinaNet é um modelo de detecção de objetos de estágio único que utiliza uma Rede Piramidal de Recursos (FPN) e perda focal para lidar com o desequilíbrio de classes.
O livro "The Notebook" aborda os seguintes temas:
- Carregando e transformando o dataset COCO para detecção de objetos.
- Treinar um modelo RetinaNet com backbone ResNet-50 em uma única GPU.
- Escalando o treinamento em várias GPUs usando Paralelismo de Dados Distribuídos (DDP)
- Registrando métricas de treinamento com MLflow
Conectar a compute de GPU serverless
Para executar este Notebook, conecte-se ao compute de GPU serverless com 1xA10 para treinamento de GPU única ou 8xH100 para a seção distribuída.
- Clique no seletor de compute do notebook no canto superior direito e selecione **Serverless GPU**.
- À direita, clique no botão Ambiente.
- Selecione 1xA10 ou 8xH100 como o Acelerador .
- Selecione **AI v5** como seu ambiente, e clique em **Aplicar**.
Instale o pacote necessário
Instale o pacote pycocotools para obter as utilidades dataset COCO e reinicie o ambiente Python para carregar o novo pacote.
%pip install pycocotools
dbutils.library.restartPython()
Configure os caminhos Unity Catalog com widgets.
Defina widgets para especificar o catálogo, o esquema e o volume do Unity Catalog onde o dataset COCO está armazenado.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
Importar biblioteca PyTorch
Importe o torch e o torchvision para construir e treinar o modelo de detecção de imagens.
import torch
import torchvision
Importar classes de modelo e dataset
Importe a arquitetura do modelo RetinaNet e as utilidades dataset COCO do torchvision.
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2
# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection
Importação distribuída treinamento russo
Importe os módulos de treinamento distribuído PyTorch e o decorador distribuído de GPU serverless para treinamento com múltiplas GPUs.
Defina os hiperparâmetros de treinamento e os caminhos de dados.
Configure os caminhos de dados, o tamanho dos lotes, o número de classes, a taxa de aprendizado e outros parâmetros de treinamento. Ajuste BATCH_SIZE e NUM_EPOCHS com base no tipo de GPU e nos requisitos de treinamento.
import torch.distributed as dist
from serverless_gpu import distributed
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")
BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly
Inicialize o modelo RetinaNet
Crie um modelo RetinaNet com arquitetura ResNet-50 sem pesos pré-treinados, configurado para o número de classes no dataset COCO.
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
Transforme imagens e anotações em entradas do modelo
O modelo requer entradas como tensores com formato (C, H, W), tipo de dados float32 e intervalo normalizado (0,0 a 1,0). A função get_transform converte imagens PIL e aplica aumento de dados. A classe CocoWrapper envolve o dataset COCO para formatar caixas delimitadoras e rótulos corretamente.
from torchvision.transforms import v2
from torchvision import tv_tensors
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)
# Sanity Check
img, target = dataset[0]
print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)
print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])
Criar o carregador de dados
Defina um DataLoader com ordenação personalizada para agrupar imagens e alvos para treinamento.
from torch.utils.data import DataLoader
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)
Configure o otimizador
Configure o otimizador SGD com os parâmetros de taxa de aprendizado, momento e decaimento de peso.
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)
ensinar o modelo em uma única GPU
execução do loop de treinamento para o número especificado de épocas, registrando métricas de perdas no MLflow.
import time
import mlflow
model.train()
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")
print("Training Complete.")
professor com paralelismo de dados distribuídos (DDP)
Dimensione o treinamento em várias GPUs usando o decorador @distributed. Esta abordagem copia dados para o armazenamento local em cada worker e usa o DistributedSampler para particionar o dataset entre as GPUs. Todas as importações, transformações, o dataset e o DataLoader são redefinidos dentro da função de treinamento, conforme exigido pelo modelo de execução distribuído.
from datetime import timedelta
BATCH_SIZE_PER_GPU = 8 # for better performance with H100
@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.utils.data import DataLoader
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")
uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"
if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")
dist.barrier()
local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")
dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)
train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU,
shuffle=False,
num_workers=8, # 8 workers * 8 GPUs = 64 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=4,
sampler=train_sampler
)
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)
model = DDP(model, device_ids=[local_rank])
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")
dist.destroy_process_group()
train_distributed.distributed()
Próximos passos
- Melhores práticas para computede GPU serverless
- Solucionar problemas em computeGPU serverless
- Treinamento distribuído com múltiplas GPUs e múltiplos nós
- ensinar modelos com PyTorch
- modelos registrados com MLflow