databricks-logo

mlflow3-dl-example

(Python)
Loading...

MLflow 3.0 deep learning example

This notebook first runs a model training job, which is tracked as an MLflow Run. It stores a model checkpoint every 10 epochs. Each checkpoint is tracked as an MLflow LoggedModel. You can then select the best checkpoint to deploy for production applications.

2
%pip install mlflow  --upgrade --pre torch scikit-learn
dbutils.library.restartPython()
3
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

4
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

import mlflow
import mlflow.pytorch
from mlflow.entities import Dataset

# Helper function to prepare data
def prepare_data(df):
    X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32)
    y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long)
    return X, y

# Helper function to compute accuracy
def compute_accuracy(model, X, y):
    with torch.no_grad():
        outputs = model(X)
        _, predicted = torch.max(outputs, 1)
        accuracy = (predicted == y).sum().item() / y.size(0)
    return accuracy

# Define a basic PyTorch classifier
class IrisClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(IrisClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Load Iris dataset and prepare the DataFrame
iris = load_iris()
iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names)
iris_df['target'] = iris.target

# Split into training and testing datasets
train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42)

# Prepare training data
train_dataset = mlflow.data.from_pandas(train_df, name="train")
X_train, y_train = prepare_data(train_dataset.df)

# Define the PyTorch model and move it to the device
input_size = X_train.shape[1]
hidden_size = 16
output_size = len(iris.target_names)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
scripted_model = IrisClassifier(input_size, hidden_size, output_size).to(device)
scripted_model = torch.jit.script(scripted_model)

# Start a run to represent the training job
with mlflow.start_run():
    # Load the training dataset with MLflow. We will link training metrics to this dataset.
    train_dataset: Dataset = mlflow.data.from_pandas(train_df, name="train")
    X_train, y_train = prepare_data(train_dataset.df)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(scripted_model.parameters(), lr=0.01)

    for epoch in range(101):
        X_train, y_train = X_train.to(device), y_train.to(device)
        out = scripted_model(X_train)
        loss = criterion(out, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Log a checkpoint with metrics every 10 epochs
        if epoch % 10 == 0:
            # Each newly created LoggedModel checkpoint is linked with its
            # name, params, and step 
            model_info = mlflow.pytorch.log_model(
                pytorch_model=scripted_model,
                name=f"torch-iris-{epoch}",
                params={
                    "n_layers": 3,
                    "activation": "ReLU",
                    "criterion": "CrossEntropyLoss",
                    "optimizer": "Adam"
                },
                step=epoch,
                input_example=X_train.numpy(),
            )
            # Log metric on training dataset at step and link to LoggedModel
            mlflow.log_metric(
                key="accuracy",
                value=compute_accuracy(scripted_model, X_train, y_train),
                step=epoch,
                model_id=model_info.model_id,
                dataset=train_dataset
            )

This example produced one MLflow Run (training_run) and 11 MLflow Logged Models, one for each checkpoint (at steps 0, 10, …, 100). Using MLflow’s UI or search API, you can get the checkpoints and rank them by their accuracy.

6
ranked_checkpoints = mlflow.search_logged_models(output_format="list")
ranked_checkpoints.sort(
    key=lambda model: next((metric.value for metric in model.metrics if metric.key == "accuracy"), float('-inf')),
    reverse=True
)

best_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[0]
print(best_checkpoint.metrics[0])
7
best_checkpoint
8
worst_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[-1]
print(worst_checkpoint.metrics)

After selecting the best checkpoint model, register that model to the model registry. You can also see the model ID, parameters, and metrics on the model version page in Catalog Explorer.

10
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
# If necessary, change the catalog and schema name here.

CATALOG = "main"
SCHEMA = "default"
MODEL = "dl_model"
MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL}"

uc_model_version = mlflow.register_model(f"models:/{best_checkpoint.model_id}", name=MODEL_NAME)

Now you can view the model version and all centralized performance data on the model version page in Unity Catalog. You can also get the same information using the API as shown in the following cell.

12
# Get the model version
from mlflow import MlflowClient
client = MlflowClient()
model_version = client.get_model_version(name=MODEL_NAME, version=uc_model_version.version)
print(model_version)
;