メインコンテンツまでスキップ

APIを使用してデータプロファイリングを作成する

このページでは、 Databricks SDKを使用してDatabricksでデータプロファイリングを作成する方法と、 API呼び出しで使用されるすべてのパラメーターについて説明します。 REST APIを使用してデータプロファイリングを作成および管理することもできます。 参考情報については、データプロファイリングSDKリファレンスおよびREST APIリファレンスを参照してください。

Unity Catalogに登録されている任意の管理対象または外部Deltaテーブルにプロファイルを作成できます。 Unity Catalogメタストアでは、どのテーブルに対しても 1 つのプロファイルのみを作成できます。

注記

非推奨のquality_monitors APIについては、 quality_monitors API (非推奨) を使用したデータプロファイリングの作成」を参照してください。

要件

最新バージョンの API を使用するには、ノートブックの先頭で次のコマンドを使用して Python クライアントをインストールします。

Python
%pip install "databricks-sdk>=0.68.0"

ご使用の環境で Databricks SDK を使用するために認証するには、 「認証」を参照してください。

プロファイルの種類

プロファイルを作成するときは、次のいずれかのプロファイル タイプを選択します: TimeSeriesInferenceLog 、またはSnapshot 。このセクションでは、各オプションについて簡単に説明します。詳細については、 API リファレンスまたはREST API リファレンス を参照してください。

注記
  • 時系列または推論プロファイルを初めて作成すると、Databricks は作成前の 30 日間のデータのみを分析します。プロファイルが作成されると、すべての新しいデータが処理されます。
  • マテリアライズドビューで定義されたプロファイルは、増分処理をサポートしません。
ヒント

TimeSeriesおよびInferenceプロファイルの場合、テーブルで変更データフィード (CDF) を有効にすることがベスト プラクティスです。 CDF を有効にすると、更新ごとにテーブル全体を再処理するのではなく、新しく追加されたデータのみが処理されます。これにより、実行がより効率的になり、多くのテーブルに拡張する際のコストが削減されます。

TimeSeriesプロファイル

TimeSeriesプロファイルは、時間枠全体のデータ分布を比較します。TimeSeriesプロファイルの場合、以下を指定する必要があります。

  • タイムスタンプ列 ( timestamp_column )。タイムスタンプ列のデータ型は、 TIMESTAMPまたはto_timestamp PySpark 関数を使用してタイムスタンプに変換できる型である必要があります。
  • メトリクスを計算するためのgranularitiesのセット。 利用可能な粒度は次のとおりです。
    • 集約粒度_5分
    • 集約粒度30分
    • 集約粒度_1時間
    • 集約粒度_1日
    • 集約粒度_1_週
    • 集約粒度_2週間
    • 集約粒度_3_週
    • 集約粒度_4_週
    • 集約粒度_1_月
    • 集計粒度_1年
Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, TimeSeriesConfig, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
time_series=TimeSeriesConfig(
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]),
slicing_exprs=["type='Red'"]
)

info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)

InferenceLogプロファイル

InferenceLogプロファイルはTimeSeriesプロファイルに似ていますが、モデル品質のメトリクスも含まれています。 InferenceLogプロファイルは以下を使用します:

パラメーター

説明

problem_type

MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION または MonitorInferenceLogProblemType.PROBLEM_TYPE_REGRESSION

prediction_column

モデルの予測値を含む列。

timestamp_column

推論リクエストのタイムスタンプを含む列。

model_id_column

予測に使用されるモデルの ID を含む列。

granularities

ウィンドウ内のデータを時間にわたって分割する方法を決定します。使用可能な値については、 TimeSeriesプロファイルを参照してください。

label_column

(オプション) モデル予測のグラウンドトゥルースを含む列。

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, InferenceLogConfig, InferenceProblemType, AggregationGranularity, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
inference_log=InferenceLogConfig(
problem_type=InferenceProblemType.INFERENCE_PROBLEM_TYPE_CLASSIFICATION,
prediction_column="preds",
model_id_column="model_ver",
label_column="label", # optional
timestamp_column="ts",
granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
)

info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)

InferenceLogプロファイルの場合、スライスはmodel_id_colの個別の値に基づいて自動的に作成されます。

Snapshotプロファイル

TimeSeriesとは対照的に、 Snapshotテーブルの完全な内容が時間の経過とともにどのように変化するかをプロファイルします。メトリックはテーブル内のすべてのデータに対して計算され、プロファイルが更新されるたびにテーブルの状態を反映します。

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, DataProfilingStatus, RefreshState, Refresh

w = WorkspaceClient()

schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
table_id = table.table_id
table_object_type = "table"

config = DataProfilingConfig(
output_schema_id=schema.schema_id,
assets_dir=f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}",
snapshot=SnapshotConfig(),
slicing_exprs=["type='Red'"]
)

更新して結果を表示

更新履歴を確認するには、データプロファイリングが有効になっているDatabricksワークスペースを使用する必要があります。

メトリクス テーブルを更新するには、 create_refreshを使用します。 例えば:

Python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
run_info = w.data_quality.create_refresh(
object_type=table_object_type, object_id=table_id, refresh=Refresh(
object_type=table_object_type,
object_id=table_id,
)
)

ノートブックからcreate_refresh呼び出すと、メトリクス テーブルが作成または更新されます。 この計算はノートブックが接続されているクラスター上ではなく、サーバーレス コンピュート上で実行されます。 統計が更新されている間も、ノートブックでコマンドを実行し続けることができます。

メトリクス テーブルに保存される統計情報については、 「メトリクス テーブルの監視」を参照してください。メトリクス テーブルはUnity Catalogテーブルです。 ノートブックまたは SQL クエリ エクスプローラーでクエリを実行し、カタログ エクスプローラーで表示できます。

プロファイルに関連付けられているすべての更新の履歴を表示するには、 list_refreshesを使用します。

Python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)
)

キューに登録されている、実行中である、または終了した特定の実行のステータスを取得するには、 get_refreshを使用します。

Python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
it = w.data_quality.list_refresh(object_type=table_object_type, object_id=table_id)

run_info = next(it, None)
while run_info.state in (RefreshState.MONITOR_REFRESH_STATE_PENDING, RefreshState.MONITOR_REFRESH_STATE_RUNNING):
run_info = w.data_quality.get_refresh(object_type=table_object_type, object_id=table_id, refresh_id=run_info.refresh_id)
time.sleep(30)

プロファイル設定を表示

API get_monitorを使用してプロファイル設定を確認できます。

Python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.get_monitor(object_type="table", object_id=table.table_id)

スケジュール

スケジュールに基づいて実行するようにプロファイルを設定するには、 create_monitorscheduleパラメーターを使用します。

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorTimeSeries, MonitorCronSchedule

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
schedule=CronSchedule(
quartz_cron_expression="0 0 12 * * ?", # schedules a refresh every day at 12 noon
timezone_id="PST",
)
)

info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)

詳細については、 「cron 式」を参照してください。

通知

プロファイルの通知を設定するには、 create_monitornotificationsパラメーターを使用します。

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.dataquality import Monitor, DataProfilingConfig, SnapshotConfig, NotificationSettings, NotificationDestination

w = WorkspaceClient()
schema = w.schemas.get(full_name=f"{catalog}.{schema}")
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")

config = DataProfilingConfig(
output_schema_id=schema.schema_id,
snapshot=SnapshotConfig(),
notification_settings=NotificationSettings(
# Notify the given email when a monitoring refresh fails or times out.
on_failure=NotificationDestination(
email_addresses=["your_email@domain.com"]
)
)
)

info = w.data_quality.create_monitor(
monitor=Monitor(
object_type="table",
object_id=table.table_id,
data_profiling_config=config,
),
)

イベント タイプ (例: 「on_failure」) ごとに最大 5 つの E メール アドレスがサポートされます。

メトリクステーブルへのアクセスを制御する

プロファイルによって作成されたメトリック テーブルとダッシュボードは、そのプロファイルを作成したユーザーが所有します。 Unity Catalog権限を使用して、メトリクス テーブルへのアクセスを制御できます。 ワークスペース内でダッシュボードを共有するには、ダッシュボードの右上にある [ 共有] ボタンを使用します。

プロファイルを削除する

プロファイルを削除するには:

Python
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
table = w.tables.get(full_name=f"{catalog}.{schema}.{table_name}")
w.data_quality.delete_monitor(object_type="table", object_id=table.table_id)

このコマンドは、プロファイルによって作成されたプロファイル テーブルとダッシュボードを削除しません。これらのアセットは別のステップで削除するか、別の場所に保存する必要があります。

ノートブックの例

次のノートブックの例は、プロファイルを作成し、プロファイルを更新し、作成されたメトリクス テーブルを調べる方法を示しています。

ノートブックの例: 時系列プロファイル

このノートブックでは、 TimeSeriesタイプのプロファイルを作成する方法を説明します。

TimeSeriesプロファイルのサンプルノートブック

Open notebook in new tab

ノートブックの例: 推論プロファイル (回帰)

このノートブックでは、回帰問題のInferenceLog型プロファイルを作成する方法を説明します。

推論プロファイル回帰の例ノートブック

Open notebook in new tab

ノートブックの例: 推論プロファイル (分類)

このノートブックでは、分類問題に対してInferenceLogタイプのプロファイルを作成する方法を説明します。

推論プロファイル分類サンプルノートブック

Open notebook in new tab

ノートブックの例: スナップショットプロファイル

このノートブックでは、 Snapshotタイプのプロファイルを作成する方法を説明します。

スナップショットプロファイルのサンプルノートブック

Open notebook in new tab