Databricks レイクハウスモニタリングでカスタム メトリクスを使用する

このページでは、Databricks レイクハウスモニタリングでカスタムメトリクスを作成する方法について説明します。 自動的に計算される分析とドリフトの統計に加えて、カスタムメトリクスを作成できます。 たとえば、ビジネス ロジックの一部の側面をキャプチャする加重平均を追跡したり、カスタム モデルの品質スコアを使用したりできます。 また、プライマリ テーブルの値に対する変更を追跡するカスタム ドリフト メトリックを作成することもできます (ベースラインまたは前の時間枠と比較)。

MonitorMetric API の使用方法の詳細については、 API リファレンスを参照してください。

カスタムメトリクスの種類

Databricks レイクハウスモニタリングには、次の種類のカスタム メトリクスが含まれています。

  • 集計メトリクス: 主テーブルの列に基づいて計算されます。 集計メトリクスは、プロファイルメトリクステーブルに格納されます。

  • 派生メトリクス: 以前のコンピュート集計メトリクスに基づいて計算され、プライマリテーブルのデータを直接使用しません。 派生メトリクスは、プロファイルメトリクステーブルに格納されます。

  • ドリフトメトリクス: 2 つの異なる時間枠から、または主表とベースライン表の間で、以前にコンピュート集計または派生したメトリクスを比較します。 ドリフトメトリクスは、ドリフトメトリクステーブルに保存されます。

可能な場合は派生メトリックとドリフトメトリクスを使用すると、プライマリテーブル全体の再計算が最小限に抑えられます。 プライマリテーブルからのメトリクスアクセスデータのみを集計します。 派生およびドリフトメトリクスは、集計メトリクス値から直接計算することができます。

カスタムメトリクスパラメーター

カスタムメトリクスを定義するには、SQL カラム式の Jinja テンプレート を作成します。 このセクションの表では、メトリクスを定義するパラメーターと、Jinja テンプレートで使用されるパラメーターについて説明します。

パラメーター

説明

type

MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATEMonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED、または MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFTのいずれかです。

name

メトリクステーブル内のカスタムメトリクスのカラム名。

input_columns

メトリクスをコンピュートにする必要がある入力テーブル内の列名のリスト。 計算に複数の列が使用されることを示すには、 :tableを使用します。 この記事の例を参照してください。

definition

メトリックをコンピュートする方法を指定するSQLの Jinja テンプレート。 「 定義の作成」を参照してください。

output_data_type

Spark文字列形式でのメトリクス出力のJSON データ型。

definitionの作成

definition パラメーターは、Jinja テンプレート形式の 1 つの文字列式である必要があります。ジョインやサブクエリを含めることはできません。

次の表に、SQL Jinja テンプレートを作成してメトリクスの計算方法を指定するために使用できるパラメーターを示します。

パラメーター

説明

{{input_column}}

カスタムメトリクスをコンピュートするために使用される列。

{{prediction_col}}

機械学習モデルの予測を保持する列。 InferenceLog 分析で使用されます。

{{label_col}}

機械学習モデルのグラウンドトゥルースラベルを保持する列。 InferenceLog 分析で使用されます。

{{current_df}}

前の時間枠と比較したドリフト。 前の時間枠のデータ。

{{base_df}}

ベースライン テーブルと比較したドリフト。 ベースライン データ。

集計メトリクスの例

次の例は、列内の値の 2 乗の平均をコンピュートし、列 f1 と列 f2に適用されます。 出力は、プロファイルメトリクステーブルに新しい列として保存され、列 f1f2に対応する分析行に表示されます。 該当する列名は、Jinja パラメーター {{input_column}}に置き換えられます。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="squared_avg",
    input_columns=["f1", "f2"],
    definition="avg(`{{input_column}}`*`{{input_column}}`)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

次のコードは、列 f1f2の差の平均をコンピュートするカスタム メトリックを定義します。 この例では、 input_columns パラメーターで [":table"] を使用して、テーブルの複数の列が計算に使用されることを示します。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="avg_diff_f1_f2",
    input_columns=[":table"],
    definition="avg(f1 - f2)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

この例は、重み付けされたモデル品質スコアであるコンピュートである。 critical 列が Trueオブザベーションの場合、その行の予測値がグラウンドトゥルースと一致しない場合、より重いペナルティが割り当てられます。生の列 (prediction および label) で定義されているため、集計メトリクスとして定義されます。 :table 列は、このメトリクスが複数の列から計算されることを示します。Jinja パラメーター {{prediction_col}}{{label_col}} は、モニターの予測列とグラウンドトゥルースラベル列の名前に置き換えられます。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
    name="weighted_error",
    input_columns=[":table"],
    definition="""avg(CASE
      WHEN {{prediction_col}} = {{label_col}} THEN 0
      WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
      ELSE 1 END)""",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

派生メトリクスの例

次のコードは、このセクションで前に定義した squared_avg メトリックの平方根を計算するカスタム メトリックを定義します。 これは派生メトリクスであるため、プライマリテーブルデータを参照せず、代わりに squared_avg 集計メトリクスの観点から定義されます。 出力は、プロファイルメトリクステーブルに新しい列として保存されます。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
    name="root_mean_square",
    input_columns=["f1", "f2"],
    definition="sqrt(squared_avg)",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)

ドリフトメトリクスの例

次のコードは、このセクションで前に定義した weighted_error メトリクスの変更を追跡するドリフトメトリクスを定義します。 {{current_df}} パラメーターと {{base_df}} パラメーターを使用すると、メトリクスは現在のウィンドウと比較ウィンドウから weighted_error 値を参照できます。比較ウィンドウは、ベースライン データまたは前のタイム ウィンドウのデータのいずれかです。 ドリフトメトリクスは、ドリフトメトリクステーブルに保存されます。

from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T

MonitorMetric(
    type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
    name="error_rate_delta",
    input_columns=[":table"],
    definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
    output_data_type=T.StructField("output", T.DoubleType()).json(),
)