ensinar um modelo de detecção de imagens RetinaNet
Este Notebook demonstra como ensinar um modelo de detecção de objetos RetinaNet do zero usando PyTorch e torchvision em compute GPU serverless Databricks . RetinaNet é um modelo de detecção de objetos de estágio único que utiliza uma Rede Piramidal de Recursos (FPN) e perda focal para lidar com o desequilíbrio de classes.
O livro "The Notebook" aborda os seguintes temas:
- Carregando e transformando o dataset COCO para detecção de objetos.
- Treinar um modelo RetinaNet com backbone ResNet-50 em uma única GPU.
- Escalando o treinamento em várias GPUs usando Paralelismo de Dados Distribuídos (DDP)
- Registrando métricas de treinamento com MLflow
Requisitos
Computação : Conecte-se à compute GPU serverless com A10s ou H100s.
Hiperparâmetros recomendados por tipo de GPU :
- A10s :
batch_size/batch_size_per_gpu= 2,prefetch_factor= 2 - H100s :
batch_size/batch_size_per_gpu= 8,prefetch_factor= 4
PacotePython : Este notebook instala pycocotools para avaliação dataset COCO russias.
Instale o pacote necessário
Instale o pacote pycocotools para obter as utilidades dataset COCO e reinicie o ambiente Python para carregar o novo pacote.
%pip install pycocotools
dbutils.library.restartPython()
Configure os caminhos Unity Catalog com widgets.
Defina widgets para especificar o catálogo, o esquema e o volume do Unity Catalog onde o dataset COCO está armazenado.
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
Importar biblioteca PyTorch
Importe o torch e o torchvision para construir e treinar o modelo de detecção de imagens.
import torch
import torchvision
Importar classes de modelo e dataset
Importe a arquitetura do modelo RetinaNet e as utilidades dataset COCO do torchvision.
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2
# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection
Importação distribuída treinamento russo
Importe os módulos de treinamento distribuído PyTorch e o decorador distribuído de GPU serverless para treinamento com múltiplas GPUs.
Defina os hiperparâmetros de treinamento e os caminhos de dados.
Configure os caminhos de dados, o tamanho dos lotes, o número de classes, a taxa de aprendizado e outros parâmetros de treinamento. Ajuste BATCH_SIZE e NUM_EPOCHS com base no tipo de GPU e nos requisitos de treinamento.
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from serverless_gpu import distributed
import time
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")
BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY=0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly
Inicialize o modelo RetinaNet
Crie um modelo RetinaNet com arquitetura ResNet-50 sem pesos pré-treinados, configurado para o número de classes no dataset COCO.
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
Transforme imagens e anotações em entradas do modelo
O modelo requer entradas como tensores com formato (C, H, W), tipo de dados float32 e intervalo normalizado (0,0 a 1,0). A função get_transform converte imagens PIL e aplica aumento de dados. A classe CocoWrapper envolve o dataset COCO para formatar caixas delimitadoras e rótulos corretamente.
from torchvision.transforms import v2
from torchvision import tv_tensors
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)
# Sanity Check
img, target = dataset[0]
print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)
print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])
Criar o carregador de dados
Defina um DataLoader com ordenação personalizada para agrupar imagens e alvos para treinamento.
from torch.utils.data import DataLoader
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)
Configure o otimizador
Configure o otimizador SGD com os parâmetros de taxa de aprendizado, momento e decaimento de peso.
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)
ensinar o modelo em uma única GPU
execução do loop de treinamento para o número especificado de épocas, registrando métricas de perdas no MLflow.
import time
import mlflow
model.train()
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")
print("Training Complete.")
professor com paralelismo de dados distribuídos (DDP)
escala treinamento em várias GPUs usando o decorador @distributed . Essa abordagem copia os dados para o armazenamento local em cada worker e usa o DistributedSampler para particionar o dataset entre as GPUs.
import shutil
from datetime import timedelta
BATCH_SIZE_PER_GPU = 2
@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")
uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"
if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")
dist.barrier()
local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")
dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)
train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU, # please use batch size per gpu of 8 with H100 for best performance
shuffle=False,
num_workers=8, # 8 workers * 2 GPUs = 16 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2,
sampler=train_sampler
)
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)
model = DDP(model, device_ids=[local_rank])
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")
dist.destroy_process_group()
train_distributed.distributed()
Próximos passos
- Melhores práticas para computede GPU serverless
- Solucionar problemas em computeGPU serverless
- Treinamento distribuído com múltiplas GPUs e múltiplos nós
- ensinar modelos com PyTorch
- modelos registrados com MLflow