畳み込みニューラルネットワークを用いた画像分類
このノートブックでは 、MNIST データセット と PyTorch を使用して画像分類のために畳み込みニューラル ネットワークCNNPyTorch( ) を 方法を示します。MNIST データセットには、手書きの数字 (0 ~ 9) のグレースケール画像が 70,000 枚含まれており、画像分類技術の学習に最適です。
以下の方法を学習します:
- A10G GPU を搭載したサーバーレス GPU コンピュートにノートブックを接続する
- 単純な畳み込みニューラルネットワークアーキテクチャを定義する
- 単一の GPU でモデルをトレーニングし、メトリクスをMLflowにログ記録します
- モデルのチェックポイントをUnity Catalogボリュームに保存する
- トレーニング済みモデルをロードして評価する
サーバレスGPUコンピュートに接続する
このノートブックはニューラルネットワークを効率的にトレーニングするために GPU を必要とします。 サーバレス GPU コンピュートに接続するには、次のステップに従ってください。
- ノートブックの上部にある [接続 ] ドロップダウンをクリックします。
- サーバレス GPU を選択します。
- ノートブックの右側にある 環境 サイドパネルを開きます。
- このデモでは、 アクセラレータ を A10 に設定します。
- [適用] を選択し、 [確認] をクリックして、この環境をノートブックに適用します。
詳細については、 「サーバレス GPU コンピュート」を参照してください。
MLflowをバージョン3.0以上にアップグレードする
ディープラーニング ワークフローには MLflow 3.0 以上が推奨されます。次のセルは MLflow をアップグレードし、変更を適用するために Python 環境を再起動します。
詳細については、 MLflow 3.0+ ディープラーニング ワークフローのベスト プラクティス」を参照してください。
%pip install -U mlflow>=3
%restart_python
チェックポイントの保存場所を構成する
次のセルはウィジェットを作成し、 Unity Catalog内でモデルのチェックポイントを保存する場所を指定します。 これらの論点は次のように定義します。
uc_catalog: Unity Catalogカタログ名uc_schema: カタログ内のスキーマ(データベース)uc_volume: チェックポイントファイルを保存するボリュームuc_model_name: この特定のモデルのボリューム内のサブディレクトリ
これらの値は、ノートブック全体でチェックポイント パスを構築するために使用されます。 /Volumes/{uc_catalog}/{uc_schema}/{uc_volume}/{uc_model_name}
次のセルではプレースホルダー値がデフォルトとして使用されます。ノートブックの上部にあるウィジェットを使用して値を更新します。または、次のセルでデフォルト値を直接更新します。
dbutils.widgets.text("uc_catalog", "main")
dbutils.widgets.text("uc_schema", "default")
dbutils.widgets.text("uc_volume", "checkpoints")
dbutils.widgets.text("uc_model_name", "cnn_mnist")
畳み込みニューラルネットワークを定義する
次のセルは、画像分類用の単純な CNN アーキテクチャを定義します。ネットワークは以下から構成されます:
- 画像から特徴を抽出するための最大プーリングを備えた2つの畳み込み層
- 抽出された特徴を分類するための2つの完全接続層
- 過剰適合を防ぐためのドロップアウト層
このコードは、モデルとオプティマイザーの状態をUnity Catalogボリュームにチェックポイントするためのヘルパー クラスと、分散トレーニング (マルチ GPU シナリオで使用) をセットアップするための関数も定義します。
この実装は、 Horovod PyTorch MNIST の例を基にしています。
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import torch.distributed as dist
import torch.distributed.checkpoint as dcp
import torch.multiprocessing as mp
from datetime import timedelta
import os
from torch.distributed.fsdp import fully_shard
from torch.distributed.checkpoint.state_dict import get_state_dict, set_state_dict
from torch.distributed.checkpoint.stateful import Stateful
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x, dim=1)
UC_CATALOG = dbutils.widgets.get("uc_catalog")
UC_SCHEMA = dbutils.widgets.get("uc_schema")
UC_VOLUME = dbutils.widgets.get("uc_volume")
UC_MODEL_NAME = dbutils.widgets.get("uc_model_name")
# Ensure that the UC Volume directory exists first
CHECKPOINT_DIR = f"/Volumes/{UC_CATALOG}/{UC_SCHEMA}/{UC_VOLUME}/{UC_MODEL_NAME}"
class AppState(Stateful):
"""This is a useful wrapper for checkpointing the Application State. Since this object is compliant
with the Stateful protocol, DCP will automatically call state_dict/load_stat_dict as needed in the
dcp.save/load APIs.
Note: We take advantage of this wrapper to hande calling distributed state dict methods on the model
and optimizer.
"""
def __init__(self, model, optimizer=None):
self.model = model
self.optimizer = optimizer
def state_dict(self):
# this line automatically manages FSDP FQN's, as well as sets the default state dict type to FSDP.SHARDED_STATE_DICT
model_state_dict, optimizer_state_dict = get_state_dict(self.model, self.optimizer)
return {
"model": model_state_dict,
"optim": optimizer_state_dict
}
def load_state_dict(self, state_dict):
# sets our state dicts on the model and optimizer, now that we've loaded
set_state_dict(
self.model,
self.optimizer,
model_state_dict=state_dict["model"],
optim_state_dict=state_dict["optim"]
)
def setup():
rank = int(os.environ["RANK"])
world_size = int(os.environ["WORLD_SIZE"])
# Shorter timeouts help surface failures quickly instead of hanging
dist.init_process_group(
backend="nccl",
timeout=timedelta(seconds=120),
init_method="env://",
rank=rank,
world_size=world_size,
)
torch.cuda.set_device(int(os.environ.get("LOCAL_RANK", 0)))
dist.barrier()
if rank == 0:
print("PG up; all ranks reached barrier")
def cleanup():
try:
dist.barrier()
finally:
dist.destroy_process_group()
トレーニングの設定
次のセルは、トレーニングのハイパーパラメータを設定します。
batch_size: 各トレーニング反復で処理される画像の数num_epochs: トレーニングデータセットの完全なパス数momentum: SGDオプティマイザーのモメンタム係数log_interval: トレーニングの進捗状況を記録する頻度
# Specify training parameters
batch_size = 100
num_epochs = 3
momentum = 0.5
log_interval = 100
トレーニングループを定義する
次のセルは、次のtrain_one_epoch関数を定義します。
- トレーニングデータのバッチを反復処理する
- 前方伝播と後方伝播を実行する
- オプティマイザーを使用してモデルの重みを更新します
- トレーニング損失を定期的に MLflow に記録します
def train_one_epoch(model, device, data_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(data_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(data_loader) * len(data),
100. * batch_idx / len(data_loader), loss.item()))
# Log metrics
mlflow.log_metric('loss', loss.item(), step=epoch * len(data_loader) + batch_idx)
単一のGPUでモデルをトレーニングする
次のセルは、次のようなメインのトレーニング関数を定義します。
- MNIST トレーニング データセットをロードします
- モデルとオプティマイザーを初期化する
- 指定されたエポック数だけモデルをトレーニングする
- 各エポックの後にチェックポイントをUnity Catalogボリュームに保存します
- エクスペリメント追跡のためにメトリクスをMLflowに記録します
import mlflow
import mlflow.pyfunc
import torch.optim as optim
from torchvision import datasets, transforms
from time import time
def train(learning_rate):
with mlflow.start_run() as run:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_dataset = datasets.MNIST(
'data',
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model = Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
with torch.no_grad():
input_example, _ = next(iter(data_loader))
output_example = model(input_example.to(device))
for epoch in range(1, num_epochs + 1):
train_one_epoch(model, device, data_loader, optimizer, epoch)
state_dict = { "app": AppState(model, optimizer) }
dcp.save(state_dict, checkpoint_id=CHECKPOINT_DIR)
print(f"saved checkpoint to {CHECKPOINT_DIR}")
トレーニング機能を実行する
次のセルは、学習率 0.001 でtrain関数を実行します。トレーニング プロセスでは次のことが行われます。
- MNIST データセットをダウンロードします (まだキャッシュされていない場合)
- モデルを3エポックトレーニングする
- トレーニングの進捗状況と損失値を表示する
- モデルのチェックポイントをUnity Catalogボリュームに保存する
- MLflowにメトリクスを記録する
A10G GPU でのトレーニングには通常数分かかります。
train(learning_rate = 0.001)
トレーニング済みモデルをロードして評価する
トレーニング後、チェックポイントからモデルをロードし、テスト データセットでそのパフォーマンスを評価できます。
次のセルは、次のtest関数を定義します。
- Unity Catalogボリュームチェックポイントからモデルの状態をロードします
- MNISTテストデータセットをダウンロードする
- テストデータでモデルを評価する
- 平均テスト損失を計算して表示します
def test():
# Load model state from checkpoint using dcp
model = Net()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=momentum)
app_state = AppState(model, optimizer)
state_dict = { "app": app_state }
dcp.load(state_dict, checkpoint_id=CHECKPOINT_DIR)
model.eval()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
test_dataset = datasets.MNIST(
'data',
train=False,
download=True,
transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]))
data_loader = torch.utils.data.DataLoader(test_dataset)
test_loss = 0
for data, target in data_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target)
test_loss /= len(data_loader.dataset)
print("Average test loss: {}".format(test_loss.item()))
評価を実行する
次のセルは、 test関数を実行して、MNIST テスト データセットでトレーニング済みモデルを評価します。テスト損失が低いほど、モデルのパフォーマンスが優れていることを示します。
test()
結論
おめでとう!サーバレス GPU コンピュートを使用して画像分類モデルをトレーニングすることに成功しました。 以下の方法を学びました:
- サーバレスGPUコンピュートの設定と接続
- 畳み込みニューラルネットワークアーキテクチャを定義する
- PyTorchでモデルをトレーニングし、メトリクスをMLflowにログ記録する
- モデルのチェックポイントをUnity Catalogボリュームに保存する
- トレーニング済みモデルを読み込んで評価する
GPUコンピュートからの切断
不要な GPU の使用を避けるには、GPU から手動で切断します。
- ノートブックの上部にある 「接続済み」 を選択します
- サーバレス の上にマウスを移動します
- ドロップダウンメニューから 「終了」 を選択します
- 終了するには 「確認」 を選択してください
注意 : 手動で切断しない場合は、60 分間操作が行われないと接続は自動的に終了します。
次のステップ
Databricks での機械学習について詳しくは、次のリソースをご覧ください。
- サーバレス GPU コンピュートのベスト プラクティス
- サーバーレス GPU コンピュートの問題のトラブルシューティング
- マルチGPUおよびマルチノード分散トレーニング
- MLflowトラッキング
- PyTorchでモデルをトレーニングする