%md # MLflow 3.0 deep learning example This notebook first runs a model training job, which is tracked as an MLflow Run. It stores a model checkpoint every 10 epochs. Each checkpoint is tracked as an MLflow `LoggedModel`. You can then select the best checkpoint to deploy for production applications.
MLflow 3.0 deep learning example
This notebook first runs a model training job, which is tracked as an MLflow Run. It stores a model checkpoint every 10 epochs. Each checkpoint is tracked as an MLflow LoggedModel
. You can then select the best checkpoint to deploy for production applications.
%pip install mlflow --upgrade --pre torch scikit-learn dbutils.library.restartPython()
import pandas as pd import torch import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split import mlflow import mlflow.pytorch from mlflow.entities import Dataset # Helper function to prepare data def prepare_data(df): X = torch.tensor(df.iloc[:, :-1].values, dtype=torch.float32) y = torch.tensor(df.iloc[:, -1].values, dtype=torch.long) return X, y # Helper function to compute accuracy def compute_accuracy(model, X, y): with torch.no_grad(): outputs = model(X) _, predicted = torch.max(outputs, 1) accuracy = (predicted == y).sum().item() / y.size(0) return accuracy # Define a basic PyTorch classifier class IrisClassifier(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(IrisClassifier, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.relu = nn.ReLU() self.fc2 = nn.Linear(hidden_size, output_size) def forward(self, x): x = self.fc1(x) x = self.relu(x) x = self.fc2(x) return x # Load Iris dataset and prepare the DataFrame iris = load_iris() iris_df = pd.DataFrame(data=iris.data, columns=iris.feature_names) iris_df['target'] = iris.target # Split into training and testing datasets train_df, test_df = train_test_split(iris_df, test_size=0.2, random_state=42) # Prepare training data train_dataset = mlflow.data.from_pandas(train_df, name="train") X_train, y_train = prepare_data(train_dataset.df) # Define the PyTorch model and move it to the device input_size = X_train.shape[1] hidden_size = 16 output_size = len(iris.target_names) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") scripted_model = IrisClassifier(input_size, hidden_size, output_size).to(device) scripted_model = torch.jit.script(scripted_model) # Start a run to represent the training job with mlflow.start_run(): # Load the training dataset with MLflow. We will link training metrics to this dataset. train_dataset: Dataset = mlflow.data.from_pandas(train_df, name="train") X_train, y_train = prepare_data(train_dataset.df) criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(scripted_model.parameters(), lr=0.01) for epoch in range(101): X_train, y_train = X_train.to(device), y_train.to(device) out = scripted_model(X_train) loss = criterion(out, y_train) optimizer.zero_grad() loss.backward() optimizer.step() # Log a checkpoint with metrics every 10 epochs if epoch % 10 == 0: # Each newly created LoggedModel checkpoint is linked with its # name, params, and step model_info = mlflow.pytorch.log_model( pytorch_model=scripted_model, name=f"torch-iris-{epoch}", params={ "n_layers": 3, "activation": "ReLU", "criterion": "CrossEntropyLoss", "optimizer": "Adam" }, step=epoch, input_example=X_train.numpy(), ) # Log metric on training dataset at step and link to LoggedModel mlflow.log_metric( key="accuracy", value=compute_accuracy(scripted_model, X_train, y_train), step=epoch, model_id=model_info.model_id, dataset=train_dataset )
%md This example produced one MLflow Run (`training_run`) and 11 MLflow Logged Models, one for each checkpoint (at steps 0, 10, …, 100). Using MLflow’s UI or search API, you can get the checkpoints and rank them by their accuracy.
This example produced one MLflow Run (training_run
) and 11 MLflow Logged Models, one for each checkpoint (at steps 0, 10, …, 100). Using MLflow’s UI or search API, you can get the checkpoints and rank them by their accuracy.
ranked_checkpoints = mlflow.search_logged_models(output_format="list") ranked_checkpoints.sort( key=lambda model: next((metric.value for metric in model.metrics if metric.key == "accuracy"), float('-inf')), reverse=True ) best_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[0] print(best_checkpoint.metrics[0])
best_checkpoint
worst_checkpoint: mlflow.entities.LoggedModel = ranked_checkpoints[-1] print(worst_checkpoint.metrics)
%md After selecting the best checkpoint model, register that model to the model registry. You can also see the model ID, parameters, and metrics on the model version page in Catalog Explorer.
After selecting the best checkpoint model, register that model to the model registry. You can also see the model ID, parameters, and metrics on the model version page in Catalog Explorer.
# You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema. # If necessary, change the catalog and schema name here. CATALOG = "main" SCHEMA = "default" MODEL = "dl_model" MODEL_NAME = f"{CATALOG}.{SCHEMA}.{MODEL}" uc_model_version = mlflow.register_model(f"models:/{best_checkpoint.model_id}", name=MODEL_NAME)
%md Now you can view the model version and all centralized performance data on the model version page in Unity Catalog. You can also get the same information using the API as shown in the following cell.
Now you can view the model version and all centralized performance data on the model version page in Unity Catalog. You can also get the same information using the API as shown in the following cell.
# Get the model version from mlflow import MlflowClient client = MlflowClient() model_version = client.get_model_version(name=MODEL_NAME, version=uc_model_version.version) print(model_version)