メインコンテンツまでスキップ

畳み込みニューラルネットワークを用いた画像分類

このノートブックでは 、MNIST データセット と PyTorch を使用して画像分類のために畳み込みニューラル ネットワークCNNPyTorch( ) を 方法を示します。MNIST データセットには、手書きの数字 (0 ~ 9) のグレースケール画像が 70,000 枚含まれており、画像分類技術の学習に最適です。

以下の方法を学習します:

  • A10G GPU を搭載したサーバーレス GPU コンピュートにノートブックを接続する
  • 単純な畳み込みニューラルネットワークアーキテクチャを定義する
  • 単一の GPU でモデルをトレーニングし、メトリクスをMLflowにログ記録します
  • モデルのチェックポイントをUnity Catalogボリュームに保存する
  • トレーニング済みモデルをロードして評価する

サーバレスGPUコンピュートに接続する

このノートブックはニューラルネットワークを効率的にトレーニングするために GPU を必要とします。 サーバレス GPU コンピュートに接続するには、次のステップに従ってください。

  1. ノートブックの上部にある [接続 ] ドロップダウンをクリックします。
  2. サーバレス GPU を選択します。
  3. ノートブックの右側にある 環境 サイドパネルを開きます。
  4. このデモでは、 アクセラレータA10 に設定します。
  5. [適用] を選択し、 [確認] をクリックして、この環境をノートブックに適用します。

詳細については、 「サーバレス GPU コンピュート」を参照してください。

MLflowをバージョン3.0以上にアップグレードする

ディープラーニング ワークフローには MLflow 3.0 以上が推奨されます。次のセルは MLflow をアップグレードし、変更を適用するために Python 環境を再起動します。

詳細については、 MLflow 3.0+ ディープラーニング ワークフローのベスト プラクティス」を参照してください。

Python
%pip install -U mlflow>=3
%restart_python

チェックポイントの保存場所を構成する

次のセルはウィジェットを作成し、 Unity Catalog内でモデルのチェックポイントを保存する場所を指定します。 これらの論点は次のように定義します。

  • uc_catalog: Unity Catalogカタログ名
  • uc_schema: カタログ内のスキーマ(データベース)
  • uc_volume: チェックポイントファイルを保存するボリューム
  • uc_model_name: この特定のモデルのボリューム内のサブディレクトリ

これらの値は、ノートブック全体でチェックポイント パスを構築するために使用されます。 /Volumes/{uc_catalog}/{uc_schema}/{uc_volume}/{uc_model_name}

次のセルではプレースホルダー値がデフォルトとして使用されます。ノートブックの上部にあるウィジェットを使用して値を更新します。または、次のセルでデフォルト値を直接更新します。

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "checkpoints")
dbutils.widgets.text("uc_model_name", "cnn_mnist")

畳み込みニューラルネットワークを定義する

次のセルは、画像分類用の単純な CNN アーキテクチャを定義します。ネットワークは以下から構成されます:

  • 画像から特徴を抽出するための最大プーリングを備えた2つの畳み込み層
  • 抽出された特徴を分類するための2つの完全接続層
  • 過剰適合を防ぐためのドロップアウト層

このコードは、モデルとオプティマイザーの状態をUnity Catalogボリュームにチェックポイントするためのヘルパー クラスと、分散トレーニング (マルチ GPU シナリオで使用) をセットアップするための関数も定義します。

この実装は、 Horovod PyTorch MNIST の例を基にしています。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import torch.distributed as dist
import torch.distributed.checkpoint as dcp
import torch.multiprocessing as mp
from datetime import timedelta
import os

from torch.distributed.fsdp import fully_shard
from torch.distributed.checkpoint.state_dict import get_state_dict, set_state_dict
from torch.distributed.checkpoint.stateful import Stateful

class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
UC_MODEL_NAME = dbutils.widgets.get("uc_model_name")

# Ensure that the UC Volume directory exists first
CHECKPOINT_DIR = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{UC_MODEL_NAME}"

class AppState(Stateful):
"""This is a useful wrapper for checkpointing the Application State. Since this object is compliant
with the Stateful protocol, DCP will automatically call state_dict/load_stat_dict as needed in the
dcp.save/load APIs.

Note: We take advantage of this wrapper to hande calling distributed state dict methods on the model
and optimizer.
"""

def __init__(self, model, optimizer=None):
self.model = model
self.optimizer = optimizer

def state_dict(self):
# this line automatically manages FSDP FQN's, as well as sets the default state dict type to FSDP.SHARDED_STATE_DICT
model_state_dict, optimizer_state_dict = get_state_dict(self.model, self.optimizer)
return {
"model": model_state_dict,
"optim": optimizer_state_dict
}

def load_state_dict(self, state_dict):
# sets our state dicts on the model and optimizer, now that we've loaded
set_state_dict(
self.model,
self.optimizer,
model_state_dict=state_dict["model"],
optim_state_dict=state_dict["optim"]
)

def setup():
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# Shorter timeouts help surface failures quickly instead of hanging
dist.init_process_group(
backend="nccl",
timeout=timedelta(seconds=120),
init_method="env://",
rank=rank,
world_size=world_size,
)
torch.cuda.set_device(int(os.environ.get("LOCAL_RANK", 0)))
dist.barrier()
if rank == 0:
print("PG up; all ranks reached barrier")


def cleanup():
try:
dist.barrier()
finally:
dist.destroy_process_group()

トレーニングの設定

次のセルは、トレーニングのハイパーパラメータを設定します。

  • batch_size: 各トレーニング反復で処理される画像の数
  • num_epochs: トレーニングデータセットの完全なパス数
  • momentum: SGDオプティマイザーのモメンタム係数
  • log_interval: トレーニングの進捗状況を記録する頻度
Python
# Specify training parameters
batch_size = 100
num_epochs = 3
momentum = 0.5
log_interval = 100

トレーニングループを定義する

次のセルは、次のtrain_one_epoch関数を定義します。

  • トレーニングデータのバッチを反復処理する
  • 前方伝播と後方伝播を実行する
  • オプティマイザーを使用してモデルの重みを更新します
  • トレーニング損失を定期的に MLflow に記録します
Python
def train_one_epoch(model, device, data_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(data_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(data_loader) * len(data),
100. * batch_idx / len(data_loader), loss.item()))
# Log metrics
mlflow.log_metric('loss', loss.item(), step=epoch * len(data_loader) + batch_idx)

単一のGPUでモデルをトレーニングする

次のセルは、次のようなメインのトレーニング関数を定義します。

  • MNIST トレーニング データセットをロードします
  • モデルとオプティマイザーを初期化する
  • 指定されたエポック数だけモデルをトレーニングする
  • 各エポックの後にチェックポイントをUnity Catalogボリュームに保存します
  • エクスペリメント追跡のためにメトリクスをMLflowに記録します
Python
import mlflow
import mlflow.pyfunc
import torch.optim as optim
from torchvision import datasets, transforms
from time import time

def train(learning_rate):

with mlflow.start_run() as run:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_dataset = datasets.MNIST(
'data',
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
with torch.no_grad():
input_example, _ = next(iter(data_loader))
output_example = model(input_example.to(device))

for epoch in range(1, num_epochs + 1):
train_one_epoch(model, device, data_loader, optimizer, epoch)

state_dict = { "app": AppState(model, optimizer) }
dcp.save(state_dict, checkpoint_id=CHECKPOINT_DIR)
print(f"saved checkpoint to {CHECKPOINT_DIR}")

トレーニング機能を実行する

次のセルは、学習率 0.001 でtrain関数を実行します。トレーニング プロセスでは次のことが行われます。

  • MNIST データセットをダウンロードします (まだキャッシュされていない場合)
  • モデルを3エポックトレーニングする
  • トレーニングの進捗状況と損失値を表示する
  • モデルのチェックポイントをUnity Catalogボリュームに保存する
  • MLflowにメトリクスを記録する

A10G GPU でのトレーニングには通常数分かかります。

Python
train(learning_rate = 0.001)

トレーニング済みモデルをロードして評価する

トレーニング後、チェックポイントからモデルをロードし、テスト データセットでそのパフォーマンスを評価できます。

次のセルは、次のtest関数を定義します。

  • Unity Catalogボリュームチェックポイントからモデルの状態をロードします
  • MNISTテストデータセットをダウンロードする
  • テストデータでモデルを評価する
  • 平均テスト損失を計算して表示します
Python
def test():
# Load model state from checkpoint using dcp
model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=momentum)
app_state = AppState(model, optimizer)
state_dict = { "app": app_state }
dcp.load(state_dict, checkpoint_id=CHECKPOINT_DIR)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
test_dataset = datasets.MNIST(
'data',
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(test_dataset)

test_loss = 0
for data, target in data_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target)

test_loss /= len(data_loader.dataset)
print("Average test loss: {}".format(test_loss.item()))

評価を実行する

次のセルは、 test関数を実行して、MNIST テスト データセットでトレーニング済みモデルを評価します。テスト損失が低いほど、モデルのパフォーマンスが優れていることを示します。

Python
test()

結論

おめでとう!サーバレス GPU コンピュートを使用して画像分類モデルをトレーニングすることに成功しました。 以下の方法を学びました:

  • サーバレスGPUコンピュートの設定と接続
  • 畳み込みニューラルネットワークアーキテクチャを定義する
  • PyTorchでモデルをトレーニングし、メトリクスをMLflowにログ記録する
  • モデルのチェックポイントをUnity Catalogボリュームに保存する
  • トレーニング済みモデルを読み込んで評価する

GPUコンピュートからの切断

不要な GPU の使用を避けるには、GPU から手動で切断します。

  1. ノートブックの上部にある 「接続済み」 を選択します
  2. サーバレス の上にマウスを移動します
  3. ドロップダウンメニューから 「終了」 を選択します
  4. 終了するには 「確認」 を選択してください

注意 : 手動で切断しない場合は、60 分間操作が行われないと接続は自動的に終了します。

次のステップ

Databricks での機械学習について詳しくは、次のリソースをご覧ください。

サンプルノートブック

畳み込みニューラルネットワークを用いた画像分類

ノートブックを新しいタブで開く