メインコンテンツまでスキップ

RetinaNet画像検出モデルをトレーニングする

このノートブックでは、 Databricksサーバレス GPU コンピュート上でPyTorchと torchvision を使用して、RetinaNet 物体検出モデルを最初からトレーニングする方法を示します。 RetinaNet は、特徴ピラミッド ネットワーク (FPN) と焦点損失を使用してクラスの不均衡を処理する、単一ステージのオブジェクト検出モデルです。

ノートブックには次の内容が記載されています。

  • 物体検出のための COCO データセットの読み込みと変換
  • ResNet-50 バックボーンを使用した RetinaNet モデルを単一 GPU でトレーニングする
  • 分散データ並列 (DDP) を使用して複数の GPU にわたるトレーニングをスケーリングする
  • MLflowを使用したトレーニング メトリクスのロギング

要件

コンピュート :A10s または H100s でサーバレス GPU コンピュートに接続します。

GPUタイプ別の推奨ハイパーパラメータ :

  • A10 : batch_size / batch_size_per_gpu = 2、 prefetch_factor = 2
  • H100batch_size / batch_size_per_gpu = 8、 prefetch_factor = 4

Python パッケージ : このノートブックは、COCO データセット評価ユーティリティ用のpycocotoolsをインストールします。

必要なパッケージをインストールする

COCO データセット ユーティリティ用の pycocotools をインストールし、Python 環境を再起動して新しいパッケージをロードします。

Python
%pip install pycocotools
dbutils.library.restartPython()

ウィジェットを使用してUnity Catalogパスを設定する

COCO データセットが保存される Unity Catalog カタログ、スキーマ、ボリュームを指定するためのウィジェットを定義します。

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")

PyTorchライブラリをインポートする

画像検出モデルの構築とトレーニングのために torch と torchvision をインポートします。

Python
import torch
import torchvision

モデルとデータセットのクラスをインポートする

torchvision から RetinaNet モデル アーキテクチャと COCO データセット ユーティリティをインポートします。

Python
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2

# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection

分散トレーニングユーティリティをインポートする

マルチ GPU トレーニング用にPyTorch分散トレーニング モジュールとサーバレス GPU 分散デコレータをインポートします。

トレーニングハイパーパラメータとデータパスを定義する

データ パス、バッチ サイズ、クラス数、学習率、その他のトレーニング問題を構成します。 GPU の種類とトレーニング要件に基づいてBATCH_SIZENUM_EPOCHSを調整します。

Python
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from serverless_gpu import distributed
import time
Python
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")

BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY=0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly

RetinaNetモデルを初期化する

COCO データセット内のクラス数に合わせて構成された、事前トレーニング済みの重みのない ResNet-50 バックボーンを持つ RetinaNet モデルを作成します。

Python
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)

画像と注釈をモデル入力に変換する

このモデルには、形状 (C、H、W)、float32 データ型、正規化された範囲 (0.0 ~ 1.0) を持つテンソルとしての入力が必要です。get_transform関数は PIL イメージを変換し、データ拡張を適用します。CocoWrapperクラスは COCO データセットをラップして、境界ボックスとラベルを正しくフォーマットします。

Python
from torchvision.transforms import v2
from torchvision import tv_tensors

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)

# Sanity Check
img, target = dataset[0]

print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)

print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])

データローダーを作成する

トレーニング用の画像とターゲットをバッチ処理するためのカスタム照合を備えた DataLoader を定義します。

Python
from torch.utils.data import DataLoader

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)

オプティマイザーを構成する

学習率、運動量、重み減衰を使用して SGD オプティマイザーをセットアップします。

Python
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)

単一のGPUでモデルをトレーニングする

指定されたエポック数のトレーニング ループを実行し、損失メトリクスをMLflowに記録します。

Python
import time
import mlflow

model.train()

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):

images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")

print("Training Complete.")

分散データ並列(DDP)でトレーニングする

@distributedデコレータを使用して、複数の GPU にわたってトレーニングをスケーリングします。このアプローチでは、各ワーカーのローカル ストレージにデータをコピーし、DistributedSampler を使用してデータセットを GPU 間で分割します。

Python
import shutil
from datetime import timedelta

BATCH_SIZE_PER_GPU = 2

@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))

rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])

torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"

if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")

dist.barrier()

local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")

dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)

train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU, # please use batch size per gpu of 8 with H100 for best performance
shuffle=False,
num_workers=8, # 8 workers * 2 GPUs = 16 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2,
sampler=train_sampler
)

model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)

model = DDP(model, device_ids=[local_rank])

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)

start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())

optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()

if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")

dist.destroy_process_group()

train_distributed.distributed()

次のステップ

サンプルノートブック

RetinaNet画像検出モデルをトレーニングする

ノートブックを新しいタブで開く