RetinaNet画像検出モデルをトレーニングする
このノートブックでは、 Databricksサーバレス GPU コンピュート上でPyTorchと torchvision を使用して、RetinaNet 物体検出モデルを最初からトレーニングする方法を示します。 RetinaNetは、特徴ピラミッドネットワーク(FPN)と焦点損失を用いてクラスの不均衡に対処する、単段式物体検出モデルです。
ノートブックの内容は次のとおりです。
- 物体検出のための COCO データセットのロードと変換
- 単一のGPU上でResNet-50バックボーンを使用したRetinaNetモデルのトレーニングを行う
- 分散データ並列処理(DDP)を用いた複数GPUにわたるトレーニングのスケーリング
- MLflowを使用したトレーニング メトリクスのロギング
サーバレス GPU コンピュートの接続
このノートブックを実行するには、シングルGPUトレーニング用に**1xA10**、または分散セクション用に**8xH100**を使用して、サーバレスGPUコンピュートに接続します。
- ノートブックの右上にあるコンピュートセレクターをクリックし、**サーバレスGPU**を選択します。
- 右側にある環境ボタンをクリックします。
- アクセラレータ として 1xA10 または 8xH100 を選択します。
- 環境として[AI v5] を選択し、[適用]をクリックします。
必要なパッケージをインストールします
COCO データセット ユーティリティ用の pycocotools をインストールし、 Python環境を再起動して新しいパッケージを読み込みます。
%pip install pycocotools
dbutils.library.restartPython()
ウィジェットを使用してUnity Catalogパスを設定する
Unity Catalogカタログ、スキーマ、および COCO データセットが保存されているボリュームを指定するウィジェットを定義します。
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "coco_data")
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
print(f"UC_CATALOG: {UC_CATALOG}")
print(f"UC_SCHEMA: {UC_SCHEMA}")
print(f"UC_VOLUME: {UC_VOLUME}")
PyTorchライブラリをインポートする
画像検出モデルの構築とトレーニングのために、torchとtorchvisionをインポートします。
import torch
import torchvision
モデルとデータセットのクラスをインポートします。
RetinaNet モデル アーキテクチャと COCO データセット ユーティリティを torchvision からインポートします。
import os
from torchvision.models.detection import retinanet_resnet50_fpn_v2
# For this example we will be using a default Dataset from torch
from torchvision.datasets import CocoDetection
分散型トレーニングユーティリティをインポートする
マルチ GPU トレーニング用にPyTorch分散トレーニング モジュールとサーバレス GPU 分散デコレータをインポートします。
トレーニングのハイパーパラメータとデータパスを定義する
データ パス、バッチ サイズ、クラス数、学習率、その他のトレーニング問題を構成します。 GPUの種類とトレーニング要件に基づいて、 BATCH_SIZEとNUM_EPOCHSを調整してください。
import torch.distributed as dist
from serverless_gpu import distributed
DATA_PATH = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/"
TRAIN_IMG_PATH = os.path.join(DATA_PATH, "val2017")
TRAIN_ANN_PATH = os.path.join(DATA_PATH, "annotations", "instances_val2017.json")
BATCH_SIZE = 2 # Please use batch size of 8 with H100 for best performance
NUM_CLASSES = 91
LEARNING_RATE = 0.005
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
NUM_EPOCHS = 1 # Update num_epochs accordingly
RetinaNetモデルを初期化する
COCOデータセットのクラス数に合わせて、事前学習済みの重みを使用せずにResNet-50バックボーンを使用したRetinaNetモデルを作成します。 .
# Since we are training the model from scratch, we need to initialize weights to None
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
画像と注釈をモデル入力に変換する
このモデルは、形状が(C, H, W)、データ型がfloat32、正規化範囲が(0.0~1.0)のテンソルを入力として必要とします。get_transform関数はPIL画像を変換し、データ拡張を適用します。CocoWrapperクラスは COCO データセットをラップして、バウンディング ボックスとラベルを正しくフォーマットします。
from torchvision.transforms import v2
from torchvision import tv_tensors
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dataset = CocoWrapper(
root = TRAIN_IMG_PATH,
annFile=TRAIN_ANN_PATH,
transforms=get_transform(train=True)
)
# Sanity Check
img, target = dataset[0]
print("Image type:", type(img))
print("Image shape:", img.shape) # should be [3, H, W]
print("Image dtype:", img.dtype)
print("\nTarget keys:", target.keys())
print("Boxes shape:", target["boxes"].shape)
print("Labels shape:", target["labels"].shape)
print("Image ID:", target["image_id"])
データローダーを作成する
トレーニング用の画像とターゲットをバッチ処理するために、カスタム照合を使用したデータローダーを定義します。
from torch.utils.data import DataLoader
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
shuffle=True,
num_workers=16,
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=2 # Please use a prefetch_factor of 4 with H100 for best performance
)
オプティマイザを設定します
学習率、運動量、重み減衰を使用して SGD オプティマイザーをセットアップします。
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
params,
lr=LEARNING_RATE,
momentum=MOMENTUM,
weight_decay=WEIGHT_DECAY
)
単一のGPUでモデルをトレーニングする
指定されたエポック数のトレーニング ループを実行し、損失メトリクスをMLflowに記録します。
import time
import mlflow
model.train()
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if i % 50 == 0:
print(f"Epoch {epoch+1} | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
end_time = time.time()
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(end_time - start_time)/60:.2f} min")
print("Training Complete.")
分散データ並列(DDP)でトレーニングする
複数のGPUにわたって@distributedデコレーターを使用してトレーニングをスケールします。このアプローチでは、各ワーカーでデータをローカルストレージにコピーし、DistributedSampler を使用してデータセットを GPU 全体にパーティション分割します。すべてのインポート、変換、データセット、およびDataLoaderは、分散実行モデルで必要とされるため、トレーニング関数内で再定義されます。
from datetime import timedelta
BATCH_SIZE_PER_GPU = 8 # for better performance with H100
@distributed(gpus=8, gpu_type='H100')
def train_distributed():
import os
import torch
import torch.distributed as dist
import time
import shutil
import torchvision
import mlflow
from torch.utils.data import DataLoader
from torchvision.models.detection import retinanet_resnet50_fpn_v2
from torchvision.transforms import v2
from torchvision import tv_tensors
from torchvision.datasets import CocoDetection
def get_transform(train):
transforms = []
transforms.append(v2.ToImage())
transforms.append(v2.ToDtype(torch.float32, scale=True))
if train:
transforms.append(v2.RandomHorizontalFlip())
return v2.Compose(transforms)
def collate_fn(batch):
images, targets = list(zip(*batch))
return list(images), list(targets)
class CocoWrapper(CocoDetection):
def __init__(self, root, annFile, transforms=None):
super().__init__(root, annFile)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super().__getitem__(idx)
image_id = self.ids[idx]
boxes = []
labels = []
for obj in target:
x, y, w, h = obj["bbox"]
boxes.append([x, y, x + w, y + h])
labels.append(obj["category_id"])
if len(boxes) == 0:
boxes = torch.zeros((0, 4), dtype=torch.float32)
labels = torch.zeros((0,), dtype=torch.int64)
else:
boxes = torch.as_tensor(boxes, dtype=torch.float32)
labels = torch.as_tensor(labels, dtype=torch.int64)
w, h = img.size
boxes = torchvision.tv_tensors.BoundingBoxes(
data=boxes,
format=torchvision.tv_tensors.BoundingBoxFormat.XYXY,
canvas_size=(h, w)
)
final_target = {
"boxes": boxes,
"labels": labels,
"image_id": torch.tensor([image_id])
}
if self._transforms is not None:
img, final_target = self._transforms(img, final_target)
return img, final_target
dist.init_process_group(backend="nccl", timeout=timedelta(minutes=30))
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
torch.cuda.set_device(local_rank)
device = torch.device(f"cuda:{local_rank}")
uc_source_path = DATA_PATH
local_dest_path = "/tmp/coco_data"
if local_rank == 0:
if not os.path.exists(local_dest_path):
print(f"Rank {rank}: Copying data from {uc_source_path} to {local_dest_path}...")
shutil.copytree(uc_source_path, local_dest_path, dirs_exist_ok=True)
print(f"Rank {rank}: Data copy finished!")
else:
print(f"Rank {rank}: Data already exists in local temp.")
dist.barrier()
local_train_img_path = os.path.join(local_dest_path, "val2017")
local_train_ann_path = os.path.join(local_dest_path, "annotations", "instances_val2017.json")
dataset = CocoWrapper(
root=local_train_img_path,
annFile=local_train_ann_path,
transforms=get_transform(train=True)
)
train_sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True)
train_loader = DataLoader(
dataset,
batch_size=BATCH_SIZE_PER_GPU,
shuffle=False,
num_workers=8, # 8 workers * 8 GPUs = 64 total CPU threads
collate_fn=collate_fn,
pin_memory=True,
prefetch_factor=4,
sampler=train_sampler
)
model = retinanet_resnet50_fpn_v2(weights=None, num_classes=NUM_CLASSES)
model.to(device)
model = DDP(model, device_ids=[local_rank])
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.04, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
model.train()
if rank == 0:
print(f"Training on {world_size} GPUs. Global Batch Size: {BATCH_SIZE_PER_GPU * world_size}")
with mlflow.start_run():
for epoch in range(NUM_EPOCHS):
train_sampler.set_epoch(epoch)
start_time = time.time()
epoch_loss = 0
for i, (images, targets) in enumerate(train_loader):
images = [image.to(device) for image in images]
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
epoch_loss += losses.item()
if rank == 0:
mlflow.log_metric("loss", losses.item(), step=epoch * len(train_loader) + i)
if rank == 0 and i % 50 == 0:
print(f"Rank 0 | Step {i}/{len(train_loader)} | Loss: {losses.item():.4f}")
lr_scheduler.step()
if rank == 0:
avg_loss = epoch_loss / len(train_loader)
mlflow.log_metric("epoch_avg_loss", avg_loss, step=epoch)
print(f"Epoch {epoch+1} Finished! Avg Loss: {avg_loss:.4f} | Time: {(time.time() - start_time)/60:.2f} min")
dist.destroy_process_group()
train_distributed.distributed()
次のステップ
- サーバレス GPU コンピュートのベスト プラクティス
- サーバーレス GPU コンピュートの問題のトラブルシューティング
- マルチGPUおよびマルチノード分散トレーニング
- PyTorchでモデルをトレーニングする
- MLflowを使用した記録済みモデル