RetinaNet画像検出モデルをトレーニングする
このノートブックでは、 Databricksサーバレス GPU コンピュート上でPyTorchと torchvision を使用して、RetinaNet 物体検出モデルを最初からトレーニングする方法を示します。 RetinaNet は、特徴ピラミッド ネットワーク (FPN) と焦点損失を使用してクラスの不均衡を処理する、単一ステージのオブジェクト検出モデルです。
ノートブックには次の内容が記載されています。
- 物体検出のための COCO データセットの読み込みと変換
- ResNet-50 バックボーンを使用した RetinaNet モデルを単一 GPU でトレーニングする
- 分散データ並列 (DDP) を使用して複数の GPU にわたるトレーニングをスケーリングする
- MLflowを使用したトレーニング メトリクスのロギング
要件
コンピュート :A10s または H100s でサーバレス GPU コンピュートに接続します。
GPUタイプ別の推奨ハイパーパラメータ :
- A10 :
batch_size/batch_size_per_gpu= 2、prefetch_factor= 2 - H100 :
batch_size/batch_size_per_gpu= 8、prefetch_factor= 4
Python パッケージ : このノートブックは、COCO データセット評価ユーティリティ用のpycocotoolsをインストールします。
必要なパッケージをインストールする
COCO データセット ユーティリティ用の pycocotools をインストールし、Python 環境を再起動して新しいパッケージをロードします。
%pip install pycocotools
dbutils.library.restartPython()
ウィジェットを使用してUnity Catalogパスを設定する
COCO データセットが保存される Unity Catalog カタログ、スキーマ、ボリュームを指定するためのウィジェットを定義します。
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
PyTorchライブラリをインポートする
画像検出モデルの構築とトレーニングのために torch と torchvision をインポートします。
import torch
import torchvision
モデルとデータセットのクラスをインポートする
torchvision から RetinaNet モデル アーキテクチャと COCO データセット ユーティリティをインポートします。
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2
# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection
分散トレーニングユーティリティをインポートする
マルチ GPU トレーニング用にPyTorch分散トレーニング モジュールとサーバレス GPU 分散デコレータをインポートします。
トレーニングハイパーパラメータとデータパスを定義する
データ パス、バッチ サイズ、クラス数、学習率、その他のトレーニング問題を構成します。 GPU の種類とトレーニング要件に基づいてBATCH_SIZEとNUM_EPOCHSを調整します。
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
from serverless_gpu import distributed
import time
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")
BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY=0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly
RetinaNetモデルを初期化する
COCO データセット内のクラス数に合わせて構成された、事前トレーニング済みの重みのない ResNet-50 バックボーンを持つ RetinaNet モデルを作成します。
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
画像と注釈をモデル入力に変換する
このモデルには、形状 (C、H、W)、float32 データ型、正規化された範囲 (0.0 ~ 1.0) を持つテンソルとしての入力が必要です。get_transform関数は PIL イメージを変換し、データ拡張を適用します。CocoWrapperクラスは COCO データセットをラップして、境界ボックスとラベルを正しくフォーマットします。
from torchvision.transforms import v2
from torchvision import tv_tensors
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)
# Sanity Check
img, target = dataset[0]
print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)
print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])
データローダーを作成する
トレーニング用の画像とターゲットをバッチ処理するためのカスタム照合を備えた DataLoader を定義します。
from torch.utils.data import DataLoader
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)
オプティマイザーを構成する
学習率、運動量、重み減衰を使用して SGD オプティマイザーをセットアップします。
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)
単一のGPUでモデルをトレーニングする
指定されたエポック数のトレーニング ループを実行し、損失メトリクスをMLflowに記録します。
import time
import mlflow
model.train()
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")
print("Training Complete.")
分散データ並列(DDP)でトレーニングする
@distributedデコレータを使用して、複数の GPU にわたってトレーニングをスケーリングします。このアプローチでは、各ワーカーのローカル ストレージにデータをコピーし、DistributedSampler を使用してデータセットを GPU 間で分割します。
import shutil
from datetime import timedelta
BATCH_SIZE_PER_GPU = 2
@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")
uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"
if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")
dist.barrier()
local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")
dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)
train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU, # please use batch size per gpu of 8 with H100 for best performance
shuffle=False,
num_workers=8, # 8 workers * 2 GPUs = 16 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2,
sampler=train_sampler
)
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)
model = DDP(model, device_ids=[local_rank])
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")
dist.destroy_process_group()
train_distributed.distributed()
次のステップ
- サーバレス GPU コンピュートのベスト プラクティス
- サーバーレス GPU コンピュートの問題のトラブルシューティング
- マルチGPUおよびマルチノード分散トレーニング
- PyTorchでモデルをトレーニングする
- MLflowを使用した記録済みモデル