さあ始めましょう:Databricksで最初の機械学習モデルを構築しましょう
このサンプルノートブックは、Databricks上で機械学習分類モデルをトレーニングする方法を示しています。Databricks Runtime for Machine Learning には、トレーニングおよび前処理アルゴリズム用のscikit-learn 、モデル開発プロセスを追跡するMLflow 、ハイパーチューニングをスケールするための SparkTrials を備えたHyperoptなど、多くのライブラリがプリインストールされています。
このノートブックでは、ワインが「高品質」とみなされるかどうかを予測する分類モデルを作成します。このデータセットは、様々なワインの11の特徴量(例えば、アルコール度数、酸度、残糖量など)と、1から10までの品質ランキングで構成されています。
このチュートリアルでは、以下の内容を扱います。
- パート1:MLflowトラッキングを使用して分類モデルをトレーニングする
- パート 2: モデルのパフォーマンスを向上させるためのハイパーチューニング
- パート3:結果とモデルをUnity Catalogに保存する
- パート4:モデルのデプロイ
Databricks 上で機械学習を実運用化する方法に関する詳細(モデルライフサイクル管理やモデル推論など)については、 「ML エンドツーエンドの例」を参照してください。
このデータセットはUCI機械学習リポジトリから入手可能で、 「物理化学的特性からのデータマイニングによるワイン嗜好のモデリング」 [Cortez et al., 2009]に掲載されています。
要件
- クラスターは、以下のいずれかのDatabricks Runtimeバージョン(13.3 LTS ML、14.3 LTS ML、15.4 LTS ML、または16.4 LTS ML)を実行している必要があります。Databricks Runtime 17.3 LTS ML 以降については、 「チュートリアル: Databricks で最初の機械学習モデルを構築する」を参照してください。
設定
このセクションでは、以下の操作を行います。
- Unity Catalogモデルレジストリとして使用するようにMLflowクライアントを構成します。
- モデルを登録するカタログとスキーマを設定します。
- データを読み込み、Unity Catalogのテーブルに保存します。
- データを前処理する。
MLflowクライアントの設定
もちろん、 MLflow PythonクライアントはDatabricksワークスペース モデル レジストリにモデルを作成します。 Unity Catalogにモデルを保存するには、次のセルに示すようにMLflowクライアントを設定します。
import mlflow
mlflow.set_registry_uri("databricks-uc")
次のセルでは、モデルを登録するカタログとスキーマを設定します。カタログに対してUSE CATALOG権限、スキーマに対してUSE_SCHEMA、CREATE_TABLE、およびCREATE_MODEL権限が必要です。必要に応じて、次のセル内のカタログ名とスキーマ名を変更してください。
詳細については、 Unity Catalogドキュメントを参照してください。
# Specify the catalog and schema to use. You must have USE_CATALOG privilege on the catalog and USE_SCHEMA, CREATE_TABLE, and CREATE_MODEL privileges on the schema.
# Change the catalog and schema here if necessary.
CATALOG_NAME = "main"
SCHEMA_NAME = "default"
データを読み込み、 Unity Catalogのテーブルに保存する
データセットはdatabricks-datasetsで入手可能です。次のセルでは、 .csvファイルからデータを読み込み、 Spark DataFramesに格納します。 次に、 DataFrames Unity Catalogのテーブルに書き込みます。 これにより、データが永続的に保存されるだけでなく、他者との共有方法も制御できるようになります。
white_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=';', header=True)
red_wine = spark.read.csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=';', header=True)
# Remove the spaces from the column names
for c in white_wine.columns:
white_wine = white_wine.withColumnRenamed(c, c.replace(" ", "_"))
for c in red_wine.columns:
red_wine = red_wine.withColumnRenamed(c, c.replace(" ", "_"))
# Define table names
red_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine"
white_wine_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine"
# Write to tables in Unity Catalog
spark.sql(f"DROP TABLE IF EXISTS {red_wine_table}")
spark.sql(f"DROP TABLE IF EXISTS {white_wine_table}")
white_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine")
red_wine.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine")
データの前処理
# Import required libraries
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
import matplotlib.pyplot as plt
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
# Load data from Unity Catalog as Pandas dataframes
white_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.white_wine").toPandas()
red_wine = spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.red_wine").toPandas()
# Add Boolean fields for red and white wine
white_wine['is_red'] = 0.0
red_wine['is_red'] = 1.0
data_df = pd.concat([white_wine, red_wine], axis=0)
# Define classification labels based on the wine quality
data_labels = data_df['quality'].astype('int') >= 7
data_df = data_df.drop(['quality'], axis=1)
# Split 80/20 train-test
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
data_df,
data_labels,
test_size=0.2,
random_state=1
)
パート 1. 分類モデルをトレーニングする
# Enable MLflow autologging for this notebook
mlflow.autolog()
次に、 MLflow実行のコンテキスト内で分類器をトレーニングします。これにより、トレーニングされたモデルと、関連する多くのメトリクスと問題が自動的に記録されます。
テスト データセットのモデルの AUC スコアなど、追加のメトリクスでログを補足できます。
with mlflow.start_run(run_name='gradient_boost') as run:
model = sklearn.ensemble.GradientBoostingClassifier(random_state=0)
# Models, parameters, and training metrics are tracked automatically
model.fit(X_train, y_train)
predicted_probs = model.predict_proba(X_test)
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
roc_curve = sklearn.metrics.RocCurveDisplay.from_estimator(model, X_test, y_test)
# Save the ROC curve plot to a file
roc_curve.figure_.savefig("roc_curve.png")
# The AUC score on test data is not automatically logged, so log it manually
mlflow.log_metric("test_auc", roc_auc)
# Log the ROC curve image file as an artifact
mlflow.log_artifact("roc_curve.png")
print("Test AUC of: {}".format(roc_auc))
MLflowの実行結果を表示する
記録されたトレーニング実行を表示するには、 「体験」 アイコンをクリックします。ノートブックの右上にある をクリックして、体験サイドバーを表示します。 必要に応じて、更新アイコンをクリックして最新の実行結果を取得および監視してください。
![]()
より詳細なMLflowエクスペリメント ページを表示するには、エクスペリメント ページ アイコンをクリックします。 このページでは、実行を比較し、特定の実行の詳細を表示できます。 MLflow を使用したトラックモデル開発を参照してください。
ロードモデル
MLflow API を使用すると、特定の実行結果にアクセスすることもできます。次のセル内のコードは、特定のMLflow実行でトレーニングされたモデルをロードし、それを使用して予測を行う方法を示しています。 MLflowの実行ページでは、特定のモデルを読み込むためのコードスニペットも見つけることができます。
# After a model has been logged, you can load it in different notebooks or jobs
# mlflow.pyfunc.load_model makes model prediction available under a common API
model_loaded = mlflow.pyfunc.load_model(
'runs:/{run_id}/model'.format(
run_id=run.info.run_id
)
)
predictions_loaded = model_loaded.predict(X_test)
predictions_original = model.predict(X_test)
# The loaded model should match the original
assert(np.array_equal(predictions_loaded, predictions_original))
パート 2. ハイパーチューニング
ここまでで、シンプルなモデルをトレーニングし、MLflowトラッキングサービスを使用して作業を整理しました。次に、Hyperopt を使用してより高度なチューニングを行うことができます。
HyperoptとSparkTrialsを用いた並列トレーニング
Hyperoptはハイパーチューニング用のPythonです。 DatabricksでのHyperoptの使用の詳細については、 Hyperoptで分散トレーニング アルゴリズムを使用する」を参照してください。
Hyperoptと SparkTrials を使用して、ハイパーパラメーター スイープを実行し、複数のモデルを並行してトレーニングすることができます。 これにより、モデルのパフォーマンス最適化に必要な時間が短縮されます。MLflow追跡はHyperoptと統合されており、自動的に記録済みモデルと問題を記録します。
# Define the search space to explore
search_space = {
'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
'learning_rate': hp.loguniform('learning_rate', -3, 0),
'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}
def train_model(params):
# Enable autologging on each worker
mlflow.autolog()
with mlflow.start_run(nested=True):
model_hp = sklearn.ensemble.GradientBoostingClassifier(
random_state=0,
**params
)
model_hp.fit(X_train, y_train)
predicted_probs = model_hp.predict_proba(X_test)
# Tune based on the test AUC
# In production, you could use a separate validation set instead
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
mlflow.log_metric('test_auc', roc_auc)
# Set the loss to -1*auc_score so fmin maximizes the auc_score
return {'status': STATUS_OK, 'loss': -1*roc_auc}
# SparkTrials distributes the tuning using Spark workers
# Greater parallelism speeds processing, but each hyperparameter trial has less information from other trials
# On smaller clusters try setting parallelism=2
spark_trials = SparkTrials(
parallelism=1
)
with mlflow.start_run(run_name='gb_hyperopt') as run:
# Use hyperopt to find the parameters yielding the highest AUC
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=32,
trials=spark_trials)
最適なモデルを取得するために検索が実行されます
すべての実行はMLflowによって追跡されるため、 MLflow検索実行APIを使用してメトリクスと最良の実行の確保を取得し、最高のテスト実行を伴うチューニング実行を見つけることができます。
この調整済みモデルは、パート1で学習させたより単純なモデルよりも優れた性能を発揮するはずです。
# Sort runs by their test auc. In case of ties, use the most recent run.
best_run = mlflow.search_runs(
order_by=['metrics.test_auc DESC', 'start_time DESC'],
max_results=10,
).iloc[0]
print('Best Run')
print('AUC: {}'.format(best_run["metrics.test_auc"]))
print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))
best_model_pyfunc = mlflow.pyfunc.load_model(
'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
)
# Make a dataset with all predictions
best_model_predictions = X_test
best_model_predictions["prediction"] = best_model_pyfunc.predict(X_test)
パート3:結果とモデルをUnity Catalogに保存する
predictions_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions"
spark.sql(f"DROP TABLE IF EXISTS {predictions_table}")
results = spark.createDataFrame(best_model_predictions)
# Write results back to Unity Catalog from Python
results.write.saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.predictions")
model_uri = 'runs:/{run_id}/model'.format(
run_id=best_run.run_id
)
mlflow.register_model(model_uri, f"{CATALOG_NAME}.{SCHEMA_NAME}.wine_quality_model")
パート4. モデルのデプロイ
モデルをUnity Catalogに保存した後、Serving UIを使用してデプロイできます。 以下の説明は、その概要を示すものです。詳細については、 「カスタム モデルサービング エンドポイントの作成」を参照してください。
- サイドバーの サービング をクリックして、Serving UIを表示します。

-
サービングエンドポイントの作成 をクリックします。
-
「 名前」 フィールドにエンドポイントの名前を入力してください。
-
Served entities セクションで
- エンティティ フィールドをクリックして、 サービング済みエンティティの選択 フォームを開きます。
- 「マイモデル - Unity Catalog を選択してください。 選択内容に基づいてフォームが動的に更新されます。
- 提供する
wine_quality_modelとモデルバージョンを選択してください。 - サービス提供モデルにルーティングするトラフィックの割合として 100を 選択してください。
- この例では、コンピュート タイプとして CPU を選択します。
- [コンピュート スケールアウト] で、コンピュート スケール アウト サイズとして [小] を選択します。
-
「作成」 をクリックします。「 エンドポイントの提供」 ページが表示され、 「エンドポイントの提供」の状態 が「 準備完了ではありません」 と表示されます。
-
エンドポイントの 準備 が完了したら、 「使用」 を選択して、エンドポイントに推論リクエストを送信します。