メインコンテンツまでスキップ

はじめに: Databricks で初めての機械学習モデルを構築する

このサンプルノートブックは、Databricks上で機械学習分類モデルをトレーニングする方法を示しています。Databricks Runtime for Machine Learning には、トレーニングおよび前処理アルゴリズム用のscikit-learn 、モデル開発プロセスを追跡するMLflow 、ハイパーチューニングをスケールする Optuna など、多くのライブラリがプリインストールされています。

このノートブックでは、ワインが「高品質」とみなされるかどうかを予測する分類モデルを作成します。このデータセットは、様々なワインの11の特徴量(例えば、アルコール度数、酸度、残糖量など)と、1から10までの品質ランキングで構成されています。

このチュートリアルでは、以下の内容を扱います。

  • パート1:MLflowトラッキングを使用して分類モデルをトレーニングする
  • パート 2: モデルのパフォーマンスを向上させるためのハイパーチューニング
  • パート3:結果とモデルをUnity Catalogに保存する
  • パート4:モデルのデプロイ

Databricks 上で機械学習を実運用化する方法に関する詳細(モデルライフサイクル管理やモデル推論など)については、 「ML エンドツーエンドの例」を参照してください。

このデータセットはUCI機械学習リポジトリから入手可能で、 「物理化学的特性からのデータマイニングによるワイン嗜好のモデリング」 [Cortez et al., 2009]に掲載されています。

要件

設定

このセクションでは、以下の操作を行います。

  • Unity Catalogモデルレジストリとして使用するようにMLflowクライアントを構成します。
  • モデルを登録するカタログとスキーマを設定します。
  • データを読み込み、Unity Catalogのテーブルに保存します。
  • データを前処理する。

MLflowクライアントの設定

もちろん、 MLflow PythonクライアントはDatabricksワークスペース モデル レジストリにモデルを作成します。 Unity Catalogにモデルを保存するには、次のセルに示すようにMLflowクライアントを設定します。

Python
import mlflow
mlflow.set_registry_uri("databricks-uc")

次のセルでは、モデルを登録するカタログとスキーマを設定します。カタログに対してUSE CATALOG権限、スキーマに対してUSE_SCHEMA、CREATE_TABLE、およびCREATE_MODEL権限が必要です。必要に応じて、次のセル内のカタログ名とスキーマ名を変更してください。

詳細については、 Unity Catalogドキュメントを参照してください。

Python
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"

データを読み込み、 Unity Catalogのテーブルに保存する

データセットはdatabricks-datasetsで入手可能です。次のセルでは、 .csvファイルからデータを読み込み、 Spark DataFramesに格納します。 次に、 DataFrames Unity Catalogのテーブルに書き込みます。 これにより、データが永続的に保存されるだけでなく、他者との共有方法も制御できるようになります。

Python
white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)

# Remove the spaces from the column names
for c in white_wine.columns:
white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))

# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"

# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")

データの前処理

Python
# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble

import matplotlib.pyplot as plt

import optuna
from mlflow.optuna.storage import MlflowStorage
from mlflow.pyspark.optuna.study import MlflowSparkStudy
Python
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()

# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)

# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)

# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
data_df,
data_labels,
test_size=0.2,
random_state=1
)

パート 1. 分類モデルをトレーニングする

Python
# Enable MLflow autologging for this notebook
mlflow.autolog()

次に、 MLflow実行のコンテキスト内で分類器をトレーニングします。これにより、トレーニングされたモデルと、関連する多くのメトリクスと問題が自動的に記録されます。

テスト データセットのモデルの AUC スコアなど、追加のメトリクスでログを補足できます。

Python
with mlflow.start_run(run_name='gradient_boost') as run:
model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)

# Models, parameters, and training metrics are tracked automatically
model.fit(X_train, y_train)

predicted_probs = model.predict_proba(X_test)
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)

# Save the ROC curve plot to a file
roc_curve.figure_.savefig("roc_curve.png")

# The AUC score on test data is not automatically logged, so log it manually
mlflow.log_metric("test_auc", roc_auc)

# Log the ROC curve image file as an artifact
mlflow.log_artifact("roc_curve.png")

print("Test AUC of: {}".format(roc_auc))

MLflowの実行結果を表示する

記録されたトレーニング実行を表示するには、 「体験」 アイコンをクリックします。エクスペリメントアイコンノートブックの右上にある をクリックして、体験サイドバーを表示します。 必要に応じて、更新アイコンをクリックして最新の実行結果を取得および監視してください。

体験は右側のサイドバーに記載されています

より詳細なMLflowエクスペリメント ページを表示するには、エクスペリメント ページ アイコンをクリックします。 このページでは、実行を比較し、特定の実行の詳細を表示できます。 MLflow を使用したトラックモデル開発を参照してください。

ロードモデル

MLflow API を使用すると、特定の実行結果にアクセスすることもできます。次のセル内のコードは、特定のMLflow実行でトレーニングされたモデルをロードし、それを使用して予測を行う方法を示しています。 MLflowの実行ページでは、特定のモデルを読み込むためのコードスニペットも見つけることができます。

Python
# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
'runs:/{run_id}/model'.format(
run_id=run.info.run_id
)
)

predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)

# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))

パート 2. ハイパーチューニング

ここまでで、シンプルなモデルをトレーニングし、MLflowトラッキングサービスを使用して作業を整理しました。次に、Optunaを使用してより高度なチューニングを行うことができます。

Optunaを使用した並列トレーニング

Optuna は、複数のコンピュート リソースにわたって水平方向にスケーリングできるハイパーチューニング用のオープンソースPythonライブラリです。 Databricksでの Optuna の使用の詳細については、 「Optuna によるハイパーチューニング」を参照してください。

Python
def objective(trial):
# Enable autologging on each worker
mlflow.autolog()
with mlflow.start_run(nested=True):
params = {
'n_estimators': trial.suggest_int('n_estimators', 20, 1000),
'learning_rate': trial.suggest_float('learning_rate', 0.05, 1.0, log=True),
'max_depth': trial.suggest_int('max_depth', 2, 5),
}
model_hp = sklearn.ensemble.GradientBoostingClassifier(
random_state=0,
**params
)
model_hp.fit(X_train, y_train)
predicted_probs = model_hp.predict_proba(X_test)
# Tune based on the test AUC
# In production, you could use a separate validation set instead
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
mlflow.log_metric('test_auc', roc_auc)

# Negate the AUC because Optuna minimizes the objective by default
return -roc_auc


with mlflow.start_run(run_name='gb_optuna') as run:
# Use the MLflow Tracking Server as the Optuna storage backend
experiment_id = mlflow.active_run().info.experiment_id
mlflow_storage = MlflowStorage(experiment_id=experiment_id)

# MlflowSparkStudy distributes the tuning using Spark workers
mlflow_study = MlflowSparkStudy(
study_name="gb-optuna-tuning",
storage=mlflow_storage,
)

mlflow_study.optimize(objective, n_trials=32, n_jobs=4)

最適なモデルを取得するために検索が実行されます

すべての実行はMLflowによって追跡されるため、 MLflow検索実行APIを使用してメトリクスと最良の実行の確保を取得し、最高のテスト実行を伴うチューニング実行を見つけることができます。

この調整済みモデルは、パート1で学習させたより単純なモデルよりも優れた性能を発揮するはずです。

Python
# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
order_by=['metrics.test_auc DESC', 'start_time DESC'],
max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))

best_model_pyfunc = mlflow.pyfunc.load_model(
'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
)

# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)

パート3:結果とモデルをUnity Catalogに保存する

Python
predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")

results = spark.createDataFrame(best_model_predictions)

# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
Python
model_uri = 'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)

mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")

パート4. モデルのデプロイ

モデルをUnity Catalogに保存した後、Serving UIを使用してデプロイできます。 以下の説明は、その概要を示すものです。詳細については、 「カスタム モデルサービング エンドポイントの作成」を参照してください。

  1. サイドバーの サービング をクリックして、Serving UIを表示します。

モデルサービングUI

  1. サービングエンドポイントの作成 をクリックします。

  2. 名前」 フィールドにエンドポイントの名前を入力してください。

  3. Served entities セクションで

    1. エンティティ フィールドをクリックして、 サービング済みエンティティの選択 フォームを開きます。
    2. 「マイモデル - Unity Catalog を選択してください。 選択内容に基づいてフォームが動的に更新されます。
    3. 提供するwine_quality_modelとモデルバージョンを選択してください。
    4. サービス提供モデルにルーティングするトラフィックの割合として 100を 選択してください。
    5. この例では、コンピュート タイプとして CPU を選択します。
    6. [コンピュート スケールアウト] で、コンピュート スケール アウト サイズとして [小] を選択します。
  4. 「作成」 をクリックします。「 エンドポイントの提供」 ページが表示され、 「エンドポイントの提供」の状態 が「 準備完了ではありません」 と表示されます。

  5. エンドポイントの 準備 が完了したら、 「使用」 を選択して、エンドポイントに推論リクエストを送信します。

サンプルノートブック

はじめに: Databricks で初めての機械学習モデルを構築する

ノートブックを新しいタブで開く