メインコンテンツまでスキップ

畳み込みニューラルネットワークを用いた画像分類

このノートブックでは 、MNIST データセット と PyTorch を使用して画像分類のために畳み込みニューラル ネットワークCNNPyTorch( ) を 方法を示します。MNIST データセットには、手書きの数字 (0 ~ 9) の 70,000 個のグレースケール画像が含まれており、画像分類技術の学習に最適です。

あなたは以下のことを学びます:

  • A10G GPU を使用してノートブックをサーバーレス GPU コンピュートに接続します
  • 単純な畳み込みニューラルネットワークのアーキテクチャを定義する
  • 単一の GPU でモデルをトレーニングし、メトリクスをMLflowにログ記録します
  • モデルのチェックポイントをUnity Catalogボリュームに保存する
  • 学習済みモデルをロードして評価する

サーバレスGPUコンピュートに接続する

このノートブックはニューラルネットワークを効率的にトレーニングするために GPU を必要とします。 サーバレス GPU コンピュートに接続するには、次のステップに従ってください。

  1. ノートブック上部の「 接続」 ドロップダウンをクリックしてください。
  2. サーバレス GPU を選択します。
  3. ノートブックの右側にある 「環境」 サイドパネルを開きます。
  4. このデモでは、 アクセラレータを A10 に設定してください。
  5. 「適用」 を選択し、 「確認」 をクリックすると、この環境がノートブックに適用されます。

詳細については、 「サーバレス GPU コンピュート」を参照してください。

MLflowをバージョン3.0以降にアップグレードしてください。

ディープラーニングのワークフローには、MLflow 3.0以降を推奨します。以下のセルは、MLflowをアップグレードし、変更を適用するためにPython環境を再起動します。

詳細については、 MLflow 3.0+ ディープラーニング ワークフローのベスト プラクティス」を参照してください。

Python
%pip install -U mlflow>=3
%restart_python

チェックポイントの保存場所を設定する

次のセルはウィジェットを作成します。モデルのチェックポイントがUnity Catalogのどこに保存されるかを指定します。 これらの論点は次のように定義します。

  • uc_catalogUnity Catalogカタログ名
  • uc_schemaカタログ内のスキーマ(データベース)
  • uc_volumeチェックポイントファイルを保存するボリューム
  • uc_model_nameこの特定のモデルに対応するボリューム内のサブディレクトリ

これらの値は、ノートブック全体でチェックポイントパスを構築するために使用されます。 /Volumes/{uc_catalog}/{uc_schema}/{uc_volume}/{uc_model_name}

以下のセルでは、プレースホルダー値をデフォルト値として使用します。ノートブック上部のウィジェットを使用して値を更新してください。または、次のセルでデフォルト値を直接更新してください。

Python
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "checkpoints")
dbutils.widgets.text("uc_model_name", "cnn_mnist")

畳み込みニューラルネットワークを定義する

以下のセルは、画像分類のためのシンプルなCNNアーキテクチャを定義しています。ネットワークは以下で構成されます。

  • 画像から特徴を抽出するために、最大プーリングを備えた2つの畳み込み層を使用する。
  • 抽出された特徴を分類するための2つの全結合層
  • 過学習を防ぐためのドロップアウト層

このコードでは、モデルとオプティマイザの状態をUnity Catalogボリュームにチェックポイントするためのヘルパークラスと、分散環境をセットアップするための関数も定義しています。 (マルチGPUシナリオで使用)。

この実装は、 Horovod PyTorch MNIST サンプルを基にしています。

Python
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import torch.distributed as dist
import torch.distributed.checkpoint as dcp
import torch.multiprocessing as mp
from datetime import timedelta
import os

from torch.distributed.fsdp import fully_shard
from torch.distributed.checkpoint.state_dict import get_state_dict, set_state_dict
from torch.distributed.checkpoint.stateful import Stateful

class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)

def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)

UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
UC_MODEL_NAME = dbutils.widgets.get("uc_model_name")

# Ensure that the UC Volume directory exists first
CHECKPOINT_DIR = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{UC_MODEL_NAME}"

class AppState(Stateful):
"""This is a useful wrapper for checkpointing the Application State. Since this object is compliant
with the Stateful protocol, DCP will automatically call state_dict/load_stat_dict as needed in the
dcp.save/load APIs.

Note: We take advantage of this wrapper to hande calling distributed state dict methods on the model
and optimizer.
"""

def __init__(self, model, optimizer=None):
self.model = model
self.optimizer = optimizer

def state_dict(self):
# this line automatically manages FSDP FQN's, as well as sets the default state dict type to FSDP.SHARDED_STATE_DICT
model_state_dict, optimizer_state_dict = get_state_dict(self.model, self.optimizer)
return {
"model": model_state_dict,
"optim": optimizer_state_dict
}

def load_state_dict(self, state_dict):
# sets our state dicts on the model and optimizer, now that we've loaded
set_state_dict(
self.model,
self.optimizer,
model_state_dict=state_dict["model"],
optim_state_dict=state_dict["optim"]
)

def setup():
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# Shorter timeouts help surface failures quickly instead of hanging
dist.init_process_group(
backend="nccl",
timeout=timedelta(seconds=120),
init_method="env://",
rank=rank,
world_size=world_size,
)
torch.cuda.set_device(int(os.environ.get("LOCAL_RANK", 0)))
dist.barrier()
if rank == 0:
print("PG up; all ranks reached barrier")


def cleanup():
try:
dist.barrier()
finally:
dist.destroy_process_group()

トレーニングの設定

次のセルは、トレーニング用のハイパーパラメータを設定します。

  • batch_size: 各トレーニング反復で処理される画像の数
  • num_epochs: トレーニングデータセットを完全に通過した回数
  • momentumSGDオプティマイザのモメンタム係数
  • log_intervalトレーニングの進捗状況を記録する頻度
Python
# Specify training parameters
batch_size = 100
num_epochs = 3
momentum = 0.5
log_interval = 100

トレーニングループを定義する

次のセルは、 train_one_epoch関数を定義します。

  • トレーニングデータのバッチを反復処理します
  • 順伝播と逆伝播を実行する
  • オプティマイザを使用してモデルの重みを更新します
  • 定期的にMLflowにトレーニング損失をログ記録する
Python
def train_one_epoch(model, device, data_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(data_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(data_loader) * len(data),
100. * batch_idx / len(data_loader), loss.item()))
# Log metrics
mlflow.log_metric('loss', loss.item(), step=epoch * len(data_loader) + batch_idx)

単一のGPUでモデルをトレーニングする

次のセルは、以下の主要なトレーニング関数を定義します。

  • MNIST トレーニング データセットをロードします
  • モデルとオプティマイザを初期化します
  • 指定されたエポック数の間モデルをトレーニングする
  • 各エポック後にチェックポイントをUnity Catalogボリュームに保存します。
  • エクスペリメント追跡のためにメトリクスをMLflowに記録します
Python
import mlflow
import mlflow.pyfunc
import torch.optim as optim
from torchvision import datasets, transforms
from time import time

def train(learning_rate):

with mlflow.start_run() as run:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_dataset = datasets.MNIST(
'data',
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
with torch.no_grad():
input_example, _ = next(iter(data_loader))
output_example = model(input_example.to(device))

for epoch in range(1, num_epochs + 1):
train_one_epoch(model, device, data_loader, optimizer, epoch)

state_dict = { "app": AppState(model, optimizer) }
dcp.save(state_dict, checkpoint_id=CHECKPOINT_DIR)
print(f"saved checkpoint to {CHECKPOINT_DIR}")

トレーニング機能を実行します

次のセルは、学習率0.001でtrain関数を実行します。トレーニング プロセスでは次のことが行われます。

  • MNIST データセットをダウンロードします (まだキャッシュされていない場合)
  • 3エポック分のモデルをトレーニングする
  • トレーニングの進捗状況と損失値を表示する
  • モデルのチェックポイントをUnity Catalogボリュームに保存します
  • MLflowにメトリクスをログ記録する

トレーニングは通常、A10G GPUでは数分で完了します。

Python
train(learning_rate = 0.001)

学習済みモデルをロードして評価する

トレーニング後、チェックポイントからモデルをロードし、テストデータセットでその性能を評価できます。

次のセルは、 test関数を定義します。

  • Unity Catalogボリュームのチェックポイントからモデルの状態を読み込みます
  • MNISTテストデータセットをダウンロードします
  • テストデータでモデルを評価する
  • 平均テスト損失を計算して表示します。
Python
def test():
# Load model state from checkpoint using dcp
model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=momentum)
app_state = AppState(model, optimizer)
state_dict = { "app": app_state }
dcp.load(state_dict, checkpoint_id=CHECKPOINT_DIR)
model.eval()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
test_dataset = datasets.MNIST(
'data',
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(test_dataset)

test_loss = 0
for data, target in data_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target)

test_loss /= len(data_loader.dataset)
print("Average test loss: {}".format(test_loss.item()))

評価を実行する

次のセルは、 test関数を実行して、MNISTテストデータセットで学習済みモデルを評価します。テスト損失が低いほど、モデルの性能が良いことを示します。

Python
test()

結論

おめでとう!サーバレス GPU コンピュートを使用して画像分類モデルをトレーニングすることに成功しました。 あなたは以下のことを学びました:

  • サーバレスGPUコンピュートの設定と接続
  • 畳み込みニューラルネットワークのアーキテクチャを定義する
  • PyTorchでモデルをトレーニングし、メトリクスをMLflowにログ記録する
  • モデルのチェックポイントをUnity Catalogボリュームに保存する
  • 学習済みモデルをロードして評価する

GPUコンピュートからの切断

不要なGPU使用を避けるため、GPUから手動で切断してください。

  1. ノートブックの上部にある 「接続済み」 を選択します。
  2. サーバレス の上にマウスを移動します
  3. ドロップダウンメニューから 「終了」 を選択してください。
  4. 終了するには 「確認」 を選択してください。

:手動で切断しない場合、60分間操作がないと接続は自動的に切断されます。

次のステップ

Databricksにおける機械学習についてさらに詳しく知りたい場合は、以下のリソースをご覧ください。

サンプルノートブック

畳み込みニューラルネットワークを用いた画像分類

ノートブックを新しいタブで開く