Databricks レイクハウスモニタリング でカスタム メトリクスを使用する

プレビュー

この機能はパブリックプレビュー段階です。

このページでは、Databricks レイクハウスモニタリングでカスタムメトリクスを作成する方法について説明します。 自動的に計算される分析とドリフトの統計に加えて、カスタムメトリクスを作成できます。 たとえば、ビジネス ロジックの一部の側面をキャプチャする加重平均を追跡したり、カスタム モデルの品質スコアを使用したりできます。 また、プライマリ テーブルの値に対する変更を追跡するカスタム ドリフト メトリックを作成することもできます (ベースラインまたは前の時間枠と比較)。

databricks.lakehouse_monitoring.Metric API の使用方法の詳細については、 API リファレンスを参照してください。

カスタムメトリクス の種類

Databricks レイクハウスモニタリングには、次の種類のカスタム メトリクスが含まれています。

  • 集計メトリクス: 主テーブルの列に基づいて計算されます。 集計メトリクスは、プロファイルメトリクステーブルに格納されます。

  • 派生メトリクス: 以前のコンピュート集計メトリクスに基づいて計算され、プライマリテーブルのデータを直接使用しません。 派生メトリクスは、プロファイルメトリクステーブルに格納されます。

  • ドリフトメトリクス: 2 つの異なる時間枠から、または主表とベースライン表の間で、以前にコンピュート集計または派生したメトリクスを比較します。 ドリフトメトリクスは、ドリフトメトリクステーブルに保存されます。

可能な場合は派生メトリックとドリフトメトリクスを使用すると、プライマリテーブル全体の再計算が最小限に抑えられます。 プライマリテーブルからのメトリクスアクセスデータのみを集計します。 派生およびドリフトメトリクスは、集計メトリクス値から直接計算することができます。

カスタムメトリクスパラメーター

カスタムメトリクスを定義するには、SQL カラム式の Jinja テンプレート を作成します。 このセクションの表では、メトリクスを定義するパラメーターと、Jinja テンプレートで使用されるパラメーターについて説明します。

パラメーター

説明

type

aggregatederived、または driftのいずれかです。

name

メトリクステーブル内のカスタムメトリクスのカラム名。

input_columns

メトリクスをコンピュートにする必要がある入力テーブル内の列名のリスト。 計算に複数の列が使用されることを示すには、 :tableを使用します。 この記事の例を参照してください。

definition

メトリクスをコンピュートする方法を指定する SQL 式の Jinja テンプレート。 メトリクスの作成を参照してください。

output_data_type

メトリクス出力の Spark データ型。

definition の作成

definition パラメーターは、Jinja テンプレートの形式の単一の文字列式である必要があります。結合やサブクエリを含めることはできません。 複雑な定義を構築するには、Python ヘルパー関数を使用できます。

次の表に、SQL Jinja テンプレートを作成してメトリクスの計算方法を指定するために使用できるパラメーターを示します。

パラメーター

説明

{{input_column}}

カスタムメトリクスをコンピュートするために使用される列。

{{prediction_col}}

機械学習モデルの予測を保持する列。 InferenceLog 分析で使用されます。

{{label_col}}

機械学習モデルのグラウンドトゥルースラベルを保持する列。 InferenceLog 分析で使用されます。

{{current_df}}

前の時間枠と比較したドリフト。 前の時間枠のデータ。

{{base_df}}

ベースライン テーブルと比較したドリフト。 ベースライン データ。

集計メトリクス の例

次の例は、列内の値の 2 乗の平均をコンピュートし、列 f1 と列 f2に適用されます。 出力は、プロファイルメトリクステーブルに新しい列として保存され、列 f1f2に対応する分析行に表示されます。 該当する列名は、Jinja パラメーター {{input_column}}に置き換えられます。

from databricks import lakehouse_monitoring as lm
from pyspark.sql import types as T

lm.Metric(
  type="aggregate",
  name="squared_avg",
  input_columns=["f1", "f2"],
  definition="avg(`{{input_column}}`*`{{input_column}}`)",
  output_data_type=T.DoubleType()
  )

次のコードは、列 f1f2の差の平均をコンピュートするカスタム メトリックを定義します。 この例では、 input_columns パラメーターで [":table"] を使用して、テーブルの複数の列が計算に使用されることを示します。

from databricks import lakehouse_monitoring as lm
from pyspark.sql import types as T

lm.Metric(
  type="aggregate",
  name="avg_diff_f1_f2",
  input_columns=[":table"],
  definition="avg(f1 - f2)",
  output_data_type=T.DoubleType())

この例は、重み付けされたモデル品質スコアであるコンピュートである。 critical 列が Trueオブザベーションの場合、その行の予測値がグラウンドトゥルースと一致しない場合、より重いペナルティが割り当てられます。生の列 (prediction および label) で定義されているため、集計メトリクスとして定義されます。 :table 列は、このメトリクスが複数の列から計算されることを示します。Jinja パラメーター {{prediction_col}}{{label_col}} は、モニターの予測列とグラウンドトゥルースラベル列の名前に置き換えられます。

from databricks import lakehouse_monitoring as lm
from pyspark.sql import types as T

lm.Metric(
  type="aggregate",
  name="weighted_error",
  input_columns=[":table"],
  definition="""avg(CASE
    WHEN {{prediction_col}} = {{label_col}} THEN 0
    WHEN {{prediction_col}} != {{label_col}} AND critical=TRUE THEN 2
    ELSE 1 END)""",
  output_data_type=T.DoubleType()
)

派生メトリクスの例

次のコードは、このセクションで前に定義した squared_avg メトリックの平方根を計算するカスタム メトリックを定義します。 これは派生メトリクスであるため、プライマリテーブルデータを参照せず、代わりに squared_avg 集計メトリクスの観点から定義されます。 出力は、プロファイルメトリクステーブルに新しい列として保存されます。

from databricks import lakehouse_monitoring as lm
from pyspark.sql import types as T

lm.Metric(
  type="derived",
  name="root_mean_square",
  input_columns=["f1", "f2"],
  definition="sqrt(squared_avg)",
  output_data_type=T.DoubleType())

ドリフトメトリクスの例

次のコードは、このセクションで前に定義した weighted_error メトリクスの変更を追跡するドリフトメトリクスを定義します。 {{current_df}} パラメーターと {{base_df}} パラメーターを使用すると、メトリクスは現在のウィンドウと比較ウィンドウから weighted_error 値を参照できます。比較ウィンドウは、ベースライン データまたは前のタイム ウィンドウのデータのいずれかです。 ドリフトメトリクスは、ドリフトメトリクステーブルに保存されます。

from databricks import lakehouse_monitoring as lm
from pyspark.sql import types as T

lm.Metric(
  type="drift",
  name="error_rate_delta",
  input_columns=[":table"],
  definition="{{current_df}}.weighted_error - {{base_df}}.weighted_error",
  output_data_type=T.DoubleType()
)