メインコンテンツまでスキップ

RetinaNet画像検出モデルをトレーニングする

このノートブックでは、 Databricksサーバレス GPU コンピュート上でPyTorchと torchvision を使用して、RetinaNet 物体検出モデルを最初からトレーニングする方法を示します。 RetinaNetは、特徴ピラミッドネットワーク(FPN)と焦点損失を用いてクラスの不均衡に対処する、単段式物体検出モデルです。

ノートブックの内容は次のとおりです。

  • 物体検出のための COCO データセットのロードと変換
  • 単一のGPU上でResNet-50バックボーンを使用したRetinaNetモデルのトレーニングを行う
  • 分散データ並列処理(DDP)を用いた複数GPUにわたるトレーニングのスケーリング
  • MLflowを使用したトレーニング メトリクスのロギング

要件

コンピュート :A10s または H100s でサーバレス GPU コンピュートに接続します。

GPUタイプ別の推奨ハイパーパラメータ

  • A10s : batch_size / batch_size_per_gpu = 2、 prefetch_factor = 2
  • H100s : batch_size / batch_size_per_gpu = 8、 prefetch_factor = 4

Python パッケージ : このノートブックは、COCO データセットの評価ユーティリティ用にpycocotoolsをインストールします。

必要なパッケージをインストールします

COCO データセット ユーティリティ用の pycocotools をインストールし、 Python環境を再起動して新しいパッケージを読み込みます。

Python
%pip install pycocotools
dbutils.library.restartPython()

ウィジェットを使用してUnity Catalogパスを設定する

Unity Catalogカタログ、スキーマ、および COCO データセットが保存されているボリュームを指定するウィジェットを定義します。

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")

PyTorchライブラリをインポートする

画像検出モデルの構築とトレーニングのために、torchとtorchvisionをインポートします。

Python
import torch
import torchvision

モデルとデータセットのクラスをインポートします。

RetinaNet モデル アーキテクチャと COCO データセット ユーティリティを torchvision からインポートします。

Python
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2

# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection

分散型トレーニングユーティリティをインポートする

マルチ GPU トレーニング用にPyTorch分散トレーニング モジュールとサーバレス GPU 分散デコレータをインポートします。

トレーニングのハイパーパラメータとデータパスを定義する

データ パス、バッチ サイズ、クラス数、学習率、その他のトレーニング問題を構成します。 GPUの種類とトレーニング要件に基づいて、 BATCH_SIZENUM_EPOCHSを調整してください。

Python
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from serverless_gpu import distributed
import time
Python
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")

BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY=0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly

RetinaNetモデルを初期化する

COCOデータセットのクラス数に合わせて、事前学習済みの重みを使用せずにResNet-50バックボーンを使用したRetinaNetモデルを作成します。 .

Python
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)

画像と注釈をモデル入力に変換する

このモデルは、形状が(C, H, W)、データ型がfloat32、正規化範囲が(0.0~1.0)のテンソルを入力として必要とします。get_transform関数はPIL画像を変換し、データ拡張を適用します。CocoWrapperクラスは COCO データセットをラップして、バウンディング ボックスとラベルを正しくフォーマットします。

Python
from torchvision.transforms import v2
from torchvision import tv_tensors

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)

# Sanity Check
img, target = dataset[0]

print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)

print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])

データローダーを作成する

トレーニング用の画像とターゲットをバッチ処理するために、カスタム照合を使用したデータローダーを定義します。

Python
from torch.utils.data import DataLoader

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)

オプティマイザを設定します

学習率、運動量、重み減衰を使用して SGD オプティマイザーをセットアップします。

Python
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)

単一のGPUでモデルをトレーニングする

指定されたエポック数のトレーニング ループを実行し、損失メトリクスをMLflowに記録します。

Python
import time
import mlflow

model.train()

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):

images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")

print("Training Complete.")

分散データ並列(DDP)でトレーニングする

@distributedデコレータを使用して、複数の GPU にトレーニングを分散させます。このアプローチでは、データを各ワーカーのローカルストレージにコピーし、DistributedSamplerを使用してデータセットをGPU間で分割します。

Python
import shutil
from datetime import timedelta

BATCH_SIZE_PER_GPU = 2

@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))

rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])

torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"

if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")

dist.barrier()

local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")

dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)

train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU, # please use batch size per gpu of 8 with H100 for best performance
shuffle=False,
num_workers=8, # 8 workers * 2 GPUs = 16 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2,
sampler=train_sampler
)

model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)

model = DDP(model, device_ids=[local_rank])

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)

start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())

optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()

if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")

dist.destroy_process_group()

train_distributed.distributed()

次のステップ

サンプルノートブック

RetinaNet画像検出モデルをトレーニングする

ノートブックを新しいタブで開く