Use custom メトリクス with Databricks レイクハウスモニタリング
このページでは、Databricks レイクハウスモニタリングでカスタムメトリクスを作成する方法について説明します。 自動的に計算される分析とドリフトの統計に加えて、カスタムメトリクスを作成できます。 たとえば、ビジネス ロジックの一部の側面をキャプチャする加重平均を追跡したり、カスタム モデルの品質スコアを使用したりできます。 また、プライマリ テーブルの値に対する変更を追跡するカスタム ドリフト メトリックを作成することもできます (ベースラインまたは前の時間枠と比較)。
MonitorMetric
API の使用方法の詳細については、API リファレンスを参照してください。
カスタムメトリクスの種類
Databricks レイクハウスモニタリングには、次の種類のカスタム メトリクスが含まれています。
- Aggregate メトリクスは、プライマリテーブルの列に基づいて計算されます。 集計メトリクスは、プロファイル メトリクス テーブルに格納されます。
- Derived メトリクスは、以前のコンピュート集計メトリクスに基づいて計算され、プライマリテーブルのデータを直接使用しません。 派生したメトリクスは、プロファイル メトリクス テーブルに格納されます。
- Drift メトリクスは、2 つの異なる時間枠から、またはプライマリ テーブルとベースライン テーブルの間で、以前のコンピュート集計または派生したメトリクスを比較します。 ドリフトメトリクスは、ドリフトメトリクステーブルに保存されます。
可能な場合は派生メトリックとドリフトメトリクスを使用すると、プライマリテーブル全体の再計算が最小限に抑えられます。 プライマリテーブルからのメトリクスアクセスデータのみを集計します。 派生およびドリフトメトリクスは、集計メトリクス値から直接計算することができます。
Custom メトリクス パラメーター
カスタムメトリクスを定義するには、SQL カラム式の Jinja テンプレート を作成します。 このセクションの表では、メトリクスを定義するパラメーターと、Jinja テンプレートで使用されるパラメーターについて説明します。
パラメーター | 説明 |
---|---|
|
|
| メトリクステーブル内のカスタムメトリクスの列名。 |
| 入力テーブル内のカラム名のリストで、メトリクスをコンピュートにする必要があります。 計算で複数の列が使用されることを示すには、 |
| Jinjaテンプレートで、メトリクスをコンピュートする方法を指定する SQL 式です。 「定義の作成」を参照してください。 |
| Spark 、 JSON 文字列形式のメトリクス出力のデータ型。 |
創造する definition
definition
パラメーターは、Jinja テンプレート形式の 1 つの文字列式である必要があります。ジョインやサブクエリを含めることはできません。
次の表に、メトリクスの計算方法を指定するための SQL Jinja テンプレートを作成するために使用できるパラメーターを示します。
パラメーター | 説明 |
---|---|
| カスタムメトリクスをコンピュートするためのカラムです。 |
| ML モデルの予測を保持する列。 |
| ML モデルのグラウンド トゥルース ラベルを保持する列。 |
| 前の時間枠とのドリフトについて。 前のタイム ウィンドウのデータ。 |
| ベースライン テーブルと比較したドリフトの場合。 ベースライン データ。 |
Aggregate メトリクスの例
次の例は、列内の値の 2 乗の平均をコンピュートし、列 f1
と f2
に適用します。 出力は、プロファイル メトリクス テーブルの新しい列として保存され、列 f1
と f2
に対応する分析行に表示されます。 適用可能な列名は、Jinja パラメーター {{input_column}}
の代わりに使用されます。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="squared_avg",
input_columns=["f1", "f2"],
definition="avg(`{{input_column}}`*`{{input_column}}`)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
次のコードは、列 f1
と f2
の差の平均をコンピュートするカスタム メトリクスを定義しています。 この例では、input_columns
パラメーターで [":table"]
を使用して、テーブルの複数の列が計算に使用されていることを示しています。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="avg_diff_f1_f2",
input_columns=[":table"],
definition="avg(f1 - f2)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
この例では、加重モデルの品質スコアをコンピュートします。 critical
列がTrue
されている観測値では、その行の予測値がグラウンドトゥルースと一致しない場合、より重いペナルティが割り当てられます。生の列 (prediction
と label
) で定義されているため、集計メトリクスとして定義されています。 :table
列は、このメトリクスが複数の列から計算されることを示します。Jinja パラメーター {{prediction_col}}
と {{label_col}}
は、モニターの予測列とグラウンド トゥルース ラベル列の名前に置き換えられます。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE,
name="weighted_error",
input_columns=[":table"],
definition="""avg(CASE
WHEN {{prediction_col}} = {{label_col}} THEN 0
WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
ELSE 1 END)""",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
Derived メトリクス example
次のコードは、このセクションで前に定義した squared_avg
メトリクスの平方根をコンピュートするカスタム メトリクスを定義しています。 これは派生メトリクスであるため、プライマリ・テーブル・データを参照せず、代わりに squared_avg
集計メトリクスの観点から定義されます。 出力は、プロファイルメトリクステーブルの新しい列として保存されます。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED,
name="root_mean_square",
input_columns=["f1", "f2"],
definition="sqrt(squared_avg)",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)
Drift メトリクスの例
次のコードは、このセクションで前に定義した weighted_error
メトリクスの変化を追跡する drift メトリクスを定義しています。 {{current_df}}
パラメーターと {{base_df}}
パラメーターを使用すると、メトリクスは現在のウィンドウと比較ウィンドウからweighted_error
値を参照できます。比較ウィンドウは、ベースライン データまたは前のタイム ウィンドウのデータのいずれかです。 ドリフト メトリクスは、ドリフト メトリクス テーブルに保存されます。
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType
from pyspark.sql import types as T
MonitorMetric(
type=MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT,
name="error_rate_delta",
input_columns=[":table"],
definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
output_data_type=T.StructField("output", T.DoubleType()).json(),
)