API使用してデータプロファイリングを作成する
このページでは、 Databricks SDK使用してDatabricksでデータプロファイリングを作成する方法と、 API呼び出しで使用されるすべての弊害について説明します。 REST API使用してデータプロファイリングを作成および管理することもできます。 参考情報については、「データプロファイリングSDKリファレンス」および「 REST APIリファレンス」を参照してください。
Unity Catalogに登録されている任意の管理対象または外部Deltaテーブルにプロファイルを作成できます。 Unity Catalogメタストアでは、どのテーブルに対しても 1 つのプロファイルのみを作成できます。
要件
データプロファイリングAPI 、 databricks-sdk 0.28.0 以降に組み込まれています。 最新バージョンの API を使用するには、ノートブックの先頭で次のコマンドを使用して Python クライアントをインストールします。
%pip install "databricks-sdk>=0.28.0"
ご使用の環境で Databricks SDK を使用するために認証するには、 「認証」を参照してください。
プロフィールの種類
プロファイルを作成するときは、TimeSeries、InferenceLog、またはスナップショットのいずれかのプロファイル タイプを選択します。 このセクションでは、各オプションについて簡単に説明します。詳細については、 API リファレンスまたはREST API リファレンスを参照してください。
- 時系列または推論プロファイルを初めて作成すると、Databricks は作成前の 30 日間のデータのみを分析します。プロファイルが作成されると、すべての新しいデータが処理されます。
- マテリアライズドビューで定義されたプロファイルは、増分処理をサポートしません。
TimeSeriesおよびInferenceプロファイルの場合、テーブルで変更データフィード (CDF) を有効にすることがベスト プラクティスです。 CDF を有効にすると、更新ごとにテーブル全体を再処理するのではなく、新しく追加されたデータのみが処理されます。これにより、実行がより効率的になり、多くのテーブルに拡張する際のコストが削減されます。
TimeSeriesプロフィール
TimeSeriesプロファイルは、時間枠全体のデータ分布を比較します。TimeSeriesプロファイルの場合、以下を指定する必要があります。
- タイムスタンプ列 (
timestamp_col)。タイムスタンプ列のデータ型は、TIMESTAMPまたはto_timestampPySpark 関数を使用してタイムスタンプに変換できる型である必要があります。 - メトリクスを計算するための
granularitiesのセット。 使用可能な粒度は、「5 分」、「30 分」、「1 時間」、「1 日」、「1 週間」、「2 週間」、「3 週間」、「4 週間」、「1 か月」、「1 年」です。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries
w = WorkspaceClient()
w.quality_monitors.create(
table_name=f"{catalog}.{schema}.{table_name}",
assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
output_schema_name=f"{catalog}.{schema}",
time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"])
)
InferenceLogプロフィール
InferenceLogプロファイルはTimeSeriesプロファイルに似ていますが、モデル品質のメトリクスも含まれています。 InferenceLogプロファイルの場合、次の事項が必要です。
パラメーター | 説明 |
|---|---|
|
|
| モデルの予測値を含む列。 |
| 推論リクエストのタイムスタンプを含む列。 |
| 予測に使用されるモデルの ID を含む列。 |
| ウィンドウ内のデータを時間にわたって分割する方法を決定します。可能な値: 「5 分」、「30 分」、「1 時間」、「1 日」、「1 週間」、「2 週間」、「3 週間」、「4 週間」、「1 か月」、「1 年」。 |
オプションの「問題」もあります。
オプション | 説明 |
|---|---|
| モデル予測のグラウンドトゥルースを含む列。 |
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorInferenceLog, MonitorInferenceLogProblemType
w = WorkspaceClient()
w.quality_monitors.create(
table_name=f"{catalog}.{schema}.{table_name}",
assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
output_schema_name=f"{catalog}.{schema}",
inference_log=MonitorInferenceLog(
problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
prediction_col="preds",
timestamp_col="ts",
granularities=["30 minutes", "1 day"],
model_id_col="model_ver",
label_col="label", # optional
)
)
InferenceLog プロファイルの場合、 model_id_colの個別の値に基づいてスライスが自動的に作成されます。
Snapshotプロフィール
TimeSeriesとは対照的に、 Snapshotテーブルの完全な内容が時間の経過とともにどのように変化するかをプロファイルします。メトリックはテーブル内のすべてのデータに対して計算され、プロファイルが更新されるたびにテーブルの状態を反映します。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorSnapshot
w = WorkspaceClient()
w.quality_monitors.create(
table_name=f"{catalog}.{schema}.{table_name}",
assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
output_schema_name=f"{catalog}.{schema}",
snapshot=MonitorSnapshot()
)
更新して結果を表示
メトリクス テーブルを更新するには、 run_refreshを使用します。 例えば:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.quality_monitors.run_refresh(
table_name=f"{catalog}.{schema}.{table_name}"
)
ノートブックからrun_refresh呼び出すと、メトリクス テーブルが作成または更新されます。 この計算はノートブックが接続されているクラスター上ではなく、サーバーレス コンピュート上で実行されます。 統計が更新されている間も、ノートブックでコマンドを実行し続けることができます。
メトリクス テーブルに保存される統計情報については、 「メトリクス テーブルの監視」を参照してください。メトリクス テーブルはUnity Catalogテーブルです。 ノートブックまたは SQL クエリ エクスプローラーでクエリを実行し、カタログ エクスプローラーで表示できます。
プロファイルに関連付けられているすべての更新の履歴を表示するには、 list_refreshesを使用します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.quality_monitors.list_refreshes(
table_name=f"{catalog}.{schema}.{table_name}"
)
キューに登録されている、実行中である、または終了した特定の実行のステータスを取得するには、 get_refreshを使用します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.quality_monitors.run_refresh(table_name=f"{catalog}.{schema}.{table_name}")
w.quality_monitors.get_refresh(
table_name=f"{catalog}.{schema}.{table_name}",
refresh_id = run_info.refresh_id
)
キューに入っているか実行中の更新をキャンセルするには、 cancel_refreshを使用します。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
run_info = w.quality_monitors.run_refresh(table_name=f"{catalog}.{schema}.{table_name}")
w.quality_monitors.cancel_refresh(
table_name=f"{catalog}.{schema}.{table_name}",
refresh_id=run_info.refresh_id
)
プロフィール設定を表示
API get_monitorを使用してプロフィール設定を確認できます。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.quality_monitors.get(f"{catalog}.{schema}.{table_name}")
スケジュール
スケジュールに基づいて実行するようにプロファイルを設定するには、 create_monitorのschedule問題を使用します。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule
w = WorkspaceClient()
w.quality_monitors.create(
table_name=f"{catalog}.{schema}.{table_name}",
assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
output_schema_name=f"{catalog}.{schema}",
time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"]),
schedule=MonitorCronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)
詳細については、 「cron 式」を参照してください。
通知
プロファイルの通知を設定するには、 create_monitorのnotifications問題を使用します。
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorNotifications, MonitorDestination
w = WorkspaceClient()
w.quality_monitors.create(
table_name=f"{catalog}.{schema}.{table_name}",
assets_dir=f"/Workspace/Users/{user_email}/databricks_lakehouse_monitoring/{catalog}.{schema}.{table_name}",
output_schema_name=f"{catalog}.{schema}",
time_series=MonitorTimeSeries(timestamp_col=ts, granularities=["30 minutes"]),
notifications=MonitorNotifications(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=MonitorDestination(
email_addresses=["your_email@domain.com"]
)
)
)
イベント タイプ (例: 「on_failure」) ごとに最大 5 つの E メール アドレスがサポートされます。
メトリクステーブルへのアクセスを制御する
プロファイルによって作成されたメトリック テーブルとダッシュボードは、そのプロファイルを作成したユーザーが所有します。 Unity Catalog権限を使用して、メトリクス テーブルへのアクセスを制御できます。 ワークスペース内でダッシュボードを共有するには、ダッシュボードの右上にある [ 共有] ボタンを使用します。
プロフィールを削除する
プロファイルを削除するには:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.quality_monitors.delete(table_name=f"{catalog}.{schema}.{table_name}")
このコマンドは、プロファイルによって作成されたプロファイル テーブルとダッシュボードを削除しません。これらのアセットは別のステップで削除するか、別の場所に保存する必要があります。
ノートブックの例
次のノートブックの例は、プロファイルを作成し、プロファイルを更新し、作成されたメトリクス テーブルを調べる方法を示しています。
ノートブックの例: 時系列プロファイル
このノートブックでは、 TimeSeriesタイプのプロファイルを作成する方法を説明します。
TimeSeriesプロファイルのサンプルノートブック
ノートブックの例: 推論プロファイル (回帰)
このノートブックでは、回帰問題のInferenceLog型プロファイルを作成する方法を説明します。
推論プロファイル回帰の例ノートブック
ノートブックの例: 推論プロファイル (分類)
このノートブックでは、分類問題に対してInferenceLogタイプのプロファイルを作成する方法を説明します。
推論プロファイル分類サンプルノートブック
ノートブックの例: スナップショットプロファイル
このノートブックでは、 Snapshotタイプのプロファイルを作成する方法を説明します。