Pular para o conteúdo principal

ensinar um modelo de detecção de imagens RetinaNet

Este Notebook demonstra como ensinar um modelo de detecção de objetos RetinaNet do zero usando PyTorch e torchvision em compute GPU serverless Databricks . RetinaNet é um modelo de detecção de objetos de estágio único que utiliza uma Rede Piramidal de Recursos (FPN) e perda focal para lidar com o desequilíbrio de classes.

O livro "The Notebook" aborda os seguintes temas:

  • Carregando e transformando o dataset COCO para detecção de objetos.
  • Treinar um modelo RetinaNet com backbone ResNet-50 em uma única GPU.
  • Escalando o treinamento em várias GPUs usando Paralelismo de Dados Distribuídos (DDP)
  • Registrando métricas de treinamento com MLflow

Conectar a compute de GPU serverless

Para executar este Notebook, conecte-se ao compute de GPU serverless com 1xA10 para treinamento de GPU única ou 8xH100 para a seção distribuída.

  1. Clique no seletor de compute do notebook no canto superior direito e selecione **Serverless GPU**.
  2. À direita, clique no botão Ambiente.
  3. Selecione 1xA10 ou 8xH100 como o Acelerador .
  4. Selecione **AI v5** como seu ambiente, e clique em **Aplicar**.

Instale o pacote necessário

Instale o pacote pycocotools para obter as utilidades dataset COCO e reinicie o ambiente Python para carregar o novo pacote.

Python
%pip install pycocotools
dbutils.library.restartPython()

Configure os caminhos Unity Catalog com widgets.

Defina widgets para especificar o catálogo, o esquema e o volume do Unity Catalog onde o dataset COCO está armazenado.

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")

Importar biblioteca PyTorch

Importe o torch e o torchvision para construir e treinar o modelo de detecção de imagens.

Python
import torch
import torchvision

Importar classes de modelo e dataset

Importe a arquitetura do modelo RetinaNet e as utilidades dataset COCO do torchvision.

Python
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2

# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection

Importação distribuída treinamento russo

Importe os módulos de treinamento distribuído PyTorch e o decorador distribuído de GPU serverless para treinamento com múltiplas GPUs.

Defina os hiperparâmetros de treinamento e os caminhos de dados.

Configure os caminhos de dados, o tamanho dos lotes, o número de classes, a taxa de aprendizado e outros parâmetros de treinamento. Ajuste BATCH_SIZE e NUM_EPOCHS com base no tipo de GPU e nos requisitos de treinamento.

Python
import torch.distributed as dist
from serverless_gpu import distributed
Python
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")

BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly

Inicialize o modelo RetinaNet

Crie um modelo RetinaNet com arquitetura ResNet-50 sem pesos pré-treinados, configurado para o número de classes no dataset COCO.

Python
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)

Transforme imagens e anotações em entradas do modelo

O modelo requer entradas como tensores com formato (C, H, W), tipo de dados float32 e intervalo normalizado (0,0 a 1,0). A função get_transform converte imagens PIL e aplica aumento de dados. A classe CocoWrapper envolve o dataset COCO para formatar caixas delimitadoras e rótulos corretamente.

Python
from torchvision.transforms import v2
from torchvision import tv_tensors

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)

# Sanity Check
img, target = dataset[0]

print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)

print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])

Criar o carregador de dados

Defina um DataLoader com ordenação personalizada para agrupar imagens e alvos para treinamento.

Python
from torch.utils.data import DataLoader

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)

Configure o otimizador

Configure o otimizador SGD com os parâmetros de taxa de aprendizado, momento e decaimento de peso.

Python
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)

ensinar o modelo em uma única GPU

execução do loop de treinamento para o número especificado de épocas, registrando métricas de perdas no MLflow.

Python
import time
import mlflow

model.train()

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):

images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")

print("Training Complete.")

professor com paralelismo de dados distribuídos (DDP)

Dimensione o treinamento em várias GPUs usando o decorador @distributed. Esta abordagem copia dados para o armazenamento local em cada worker e usa o DistributedSampler para particionar o dataset entre as GPUs. Todas as importações, transformações, o dataset e o DataLoader são redefinidos dentro da função de treinamento, conforme exigido pelo modelo de execução distribuído.

Python
from datetime import timedelta

BATCH_SIZE_PER_GPU = 8 # for better performance with H100

@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.utils.data import DataLoader
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))

rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])

torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"

if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")

dist.barrier()

local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")

dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)

train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU,
shuffle=False,
num_workers=8, # 8 workers * 8 GPUs = 64 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=4,
sampler=train_sampler
)

model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)

model = DDP(model, device_ids=[local_rank])

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)

start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())

optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()

if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")

dist.destroy_process_group()

train_distributed.distributed()

Próximos passos

Exemplo de caderno

ensinar um modelo de detecção de imagens RetinaNet

Abrir notebook em uma nova aba