メインコンテンツまでスキップ

RetinaNet画像検出モデルをトレーニングする

このノートブックでは、 Databricksサーバレス GPU コンピュート上でPyTorchと torchvision を使用して、RetinaNet 物体検出モデルを最初からトレーニングする方法を示します。 RetinaNetは、特徴ピラミッドネットワーク(FPN)と焦点損失を用いてクラスの不均衡に対処する、単段式物体検出モデルです。

ノートブックの内容は次のとおりです。

  • 物体検出のための COCO データセットのロードと変換
  • 単一のGPU上でResNet-50バックボーンを使用したRetinaNetモデルのトレーニングを行う
  • 分散データ並列処理(DDP)を用いた複数GPUにわたるトレーニングのスケーリング
  • MLflowを使用したトレーニング メトリクスのロギング

サーバレス GPU コンピュートの接続

このノートブックを実行するには、シングルGPUトレーニング用に**1xA10**、または分散セクション用に**8xH100**を使用して、サーバレスGPUコンピュートに接続します。

  1. ノートブックの右上にあるコンピュートセレクターをクリックし、**サーバレスGPU**を選択します。
  2. 右側にある環境ボタンをクリックします。
  3. アクセラレータ として 1xA10 または 8xH100 を選択します。
  4. 環境として[AI v5] を選択し、[適用]をクリックします。

必要なパッケージをインストールします

COCO データセット ユーティリティ用の pycocotools をインストールし、 Python環境を再起動して新しいパッケージを読み込みます。

Python
%pip install pycocotools
dbutils.library.restartPython()

ウィジェットを使用してUnity Catalogパスを設定する

Unity Catalogカタログ、スキーマ、および COCO データセットが保存されているボリュームを指定するウィジェットを定義します。

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")

print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")

PyTorchライブラリをインポートする

画像検出モデルの構築とトレーニングのために、torchとtorchvisionをインポートします。

Python
import torch
import torchvision

モデルとデータセットのクラスをインポートします。

RetinaNet モデル アーキテクチャと COCO データセット ユーティリティを torchvision からインポートします。

Python
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2

# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection

分散型トレーニングユーティリティをインポートする

マルチ GPU トレーニング用にPyTorch分散トレーニング モジュールとサーバレス GPU 分散デコレータをインポートします。

トレーニングのハイパーパラメータとデータパスを定義する

データ パス、バッチ サイズ、クラス数、学習率、その他のトレーニング問題を構成します。 GPUの種類とトレーニング要件に基づいて、 BATCH_SIZENUM_EPOCHSを調整してください。

Python
import torch.distributed as dist
from serverless_gpu import distributed
Python
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")

BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly

RetinaNetモデルを初期化する

COCOデータセットのクラス数に合わせて、事前学習済みの重みを使用せずにResNet-50バックボーンを使用したRetinaNetモデルを作成します。 .

Python
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)

画像と注釈をモデル入力に変換する

このモデルは、形状が(C, H, W)、データ型がfloat32、正規化範囲が(0.0~1.0)のテンソルを入力として必要とします。get_transform関数はPIL画像を変換し、データ拡張を適用します。CocoWrapperクラスは COCO データセットをラップして、バウンディング ボックスとラベルを正しくフォーマットします。

Python
from torchvision.transforms import v2
from torchvision import tv_tensors

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)

# Sanity Check
img, target = dataset[0]

print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)

print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])

データローダーを作成する

トレーニング用の画像とターゲットをバッチ処理するために、カスタム照合を使用したデータローダーを定義します。

Python
from torch.utils.data import DataLoader

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)

オプティマイザを設定します

学習率、運動量、重み減衰を使用して SGD オプティマイザーをセットアップします。

Python
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)

単一のGPUでモデルをトレーニングする

指定されたエポック数のトレーニング ループを実行し、損失メトリクスをMLflowに記録します。

Python
import time
import mlflow

model.train()

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):

images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")

print("Training Complete.")

分散データ並列(DDP)でトレーニングする

複数のGPUにわたって@distributedデコレーターを使用してトレーニングをスケールします。このアプローチでは、各ワーカーでデータをローカルストレージにコピーし、DistributedSampler を使用してデータセットを GPU 全体にパーティション分割します。すべてのインポート、変換、データセット、およびDataLoaderは、分散実行モデルで必要とされるため、トレーニング関数内で再定義されます。

Python
from datetime import timedelta

BATCH_SIZE_PER_GPU = 8 # for better performance with H100

@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.utils.data import DataLoader
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection

def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)

def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)

class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms

def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]

boxes = []
labels = []

for obj in target:
x, y, w, h = obj["bbox"]

boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])

if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)

w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)

final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}

if self._transforms is not None:
img, final_target = self._transforms(img, final_target)

return img, final_target

dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))

rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])

torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")

uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"

if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")

dist.barrier()

local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")

dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)

train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)

train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU,
shuffle=False,
num_workers=8, # 8 workers * 8 GPUs = 64 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=4,
sampler=train_sampler
)

model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)

model = DDP(model, device_ids=[local_rank])

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")

with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)

start_time = time.time()
epoch_loss = 0

for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())

optimizer.zero_grad()
losses.backward()
optimizer.step()

epoch_loss += losses.item()

if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)

if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")

lr_scheduler.step()

if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")

dist.destroy_process_group()

train_distributed.distributed()

次のステップ

サンプルノートブック

RetinaNet画像検出モデルをトレーニングする

ノートブックを新しいタブで開く