databricks-logo

    snapshot-monitor

    (Python)
    Loading...

    Data profiling example notebook: Snapshot analysis

    User requirements

    • You must have access to run commands on a cluster with access to Unity Catalog.
    • You must have USE CATALOG privilege on at least one catalog, and you must have USE SCHEMA privileges on at least one schema. This notebook creates tables in the main.default schema. If you do not have the required privileges on the main.default schema, you must edit the notebook to change the default catalog and schema to ones that you do have privileges on.

    System requirements:

    • Your workspace must be enabled for Unity Catalog.
    • Databricks Runtime 12.2 LTS or above.
    • A Single user or Assigned cluster.

    This notebook illustrates how to create a snapshot profile, which calculates statistics over all data in the table each time the monitor is refreshed. If you want to calculate statistics separately for time windows within a table, use a TimeSeries monitor.

    For more information about data profiling, see the documentation (AWS | Azure | GCP).

    Setup

    • Verify cluster configuration
    • Install the Python SKD
    • Define catalog, schema and table names
    # Check the cluster configuration. If this cell fails, use the cluster selector at the top right of the notebook to select or configure a cluster running Databricks Runtime 12.2 LTS or above.
    import os
    
    assert float(os.environ.get("DATABRICKS_RUNTIME_VERSION", 0)) >= 12.2, "Please configure your cluster to use Databricks Runtime 12.2 LTS or above."
    %pip install "databricks-sdk>=0.28.0"
    # This step is necessary to reset the environment with our newly installed wheel.
    dbutils.library.restartPython()
    # You must have `USE CATALOG` privileges on the catalog, and you must have `USE SCHEMA` privileges on the schema.
    # If necessary, change the catalog and schema name here.
    
    CATALOG = "main"
    SCHEMA = "default"
    username = spark.sql("SELECT current_user()").first()["current_user()"]
    username_prefixes = username.split("@")[0].split(".")
    unique_suffix = "_".join([username_prefixes[0], username_prefixes[1][0:2]])
    TABLE_NAME = f"{CATALOG}.{SCHEMA}.wine_{unique_suffix}"
    BASELINE_TABLE = f"{CATALOG}.{SCHEMA}.wine_baseline_{unique_suffix}"
    spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
    spark.sql(f"DROP TABLE IF EXISTS {BASELINE_TABLE}")

    User Journey

    1. Create tables: Read raw data and create the primary table (the table to be monitored) and the baseline table (which contains data known to meet expected quality standards).
    2. Create a monitor on the primary table and refresh it to collect monitor data.
    3. Inspect the metrics tables.
    4. Apply changes to table and refresh metrics. Inspect the metrics tables.
    5. [Optional] Delete the monitor.

    1. Create the primary and (optional) baseline tables in Unity Catalog

    • The tables must be Delta tables registered in Unity Catalog and owned by the user running the notebook.
    • The table to be monitored is also called the "primary table".
    • The baseline table must have the same schema as the monitored table.

    This example uses the winequality datasets.

    import pandas as pd
    
    
    white_wine = pd.read_csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
    red_wine = pd.read_csv("/databricks-datasets/wine-quality/winequality-red.csv", sep=";")
    
    # Add categorical
    white_wine["type"] = "white"
    red_wine["type"] = "red"
    data_pdf = pd.concat([white_wine, red_wine], axis=0)
    
    # Clean columns
    data_pdf.columns = data_pdf.columns.str.replace(" ", "_")
    data_df = spark.createDataFrame(data_pdf)
    baseline_df, primary_df = data_df.randomSplit(weights=[0.25, 0.75], seed=42)
    baseline_df.display()
    (baseline_df
     .write
     .format("delta")
     .mode("overwrite")
     .option("overwriteSchema",True)
     .saveAsTable(f"{BASELINE_TABLE}")
    )
    (primary_df
     .write
     .format("delta")
     .mode("overwrite")
     .option("overwriteSchema",True)
     .saveAsTable(f"{TABLE_NAME}")
    )

    2. Create monitor

    This notebook illustrates Snapshot type analysis. For other types of analysis, see the data profiling documentation (AWS | Azure | GCP).

    Make sure to drop any columns that should be excluded from a business or use-case perspective.

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.catalog import MonitorSnapshot, MonitorInfoStatus, MonitorRefreshInfoState, MonitorMetric
    
    w = WorkspaceClient()
    # Expressions to slice data with
    SLICING_EXPRS = ["type='red'"]
    
    # Directory to store generated dashboard
    ASSETS_DIR = f"/Workspace/Users/{username}/databricks_quality_monitoring/{TABLE_NAME}"

    You can access documentation for commands as shown in the following cell

    help(w.quality_monitors.create)

    Create the monitor

    print(f"Creating monitor for {TABLE_NAME}")
    
    info = w.quality_monitors.create(
      table_name=TABLE_NAME,
      snapshot=MonitorSnapshot(),
      baseline_table_name=BASELINE_TABLE,
      slicing_exprs=SLICING_EXPRS,
      output_schema_name=f"{CATALOG}.{SCHEMA}",
      assets_dir=ASSETS_DIR
    )
    import time
    
    
    # Wait for monitor to be created
    while info.status ==  MonitorInfoStatus.MONITOR_STATUS_PENDING:
      info = w.quality_monitors.get(table_name=TABLE_NAME)
      time.sleep(10)
    
    assert info.status == MonitorInfoStatus.MONITOR_STATUS_ACTIVE, "Error creating monitor"
    # A metric refresh will automatically be triggered on creation
    refreshes = w.quality_monitors.list_refreshes(table_name=TABLE_NAME).refreshes
    assert(len(refreshes) > 0)
    
    run_info = refreshes[0]
    while run_info.state in (MonitorRefreshInfoState.PENDING, MonitorRefreshInfoState.RUNNING):
      run_info = w.quality_monitors.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
      time.sleep(30)
    
    assert run_info.state == MonitorRefreshInfoState.SUCCESS, "Monitor refresh failed"

    To view the dashboard, click Dashboards in the left nav bar.

    You can also navigate to the dashboard from the primary table in the Catalog Explorer UI. On the Quality tab, click the View dashboard button.

    For details, see the documentation (AWS | Azure | GCP).

    w.quality_monitors.get(table_name=TABLE_NAME)

    3. Inspect the metric tables

    By default, the metric tables are saved in the default database.

    The create_monitor call created two new tables: the profile metrics table and the drift metrics table.

    These two tables record the outputs of analysis jobs. The tables use the same name as the primary table to be monitored, with the suffixes _profile_metrics and _drift_metrics.

    Orientation to the profile metrics table

    The profile metrics table has the suffix _profile_metrics. For a list of statistics that are shown in the table, see the documentation (AWS | Azure | GCP).

    • For every column in the primary table, the analysis table shows summary statistics for the baseline table and for the primary table. The column log_type shows INPUT to indicate statistics for the primary table, and BASELINE to indicate statistics for the baseline table. The column from the primary table is identified in the column column_name.
    • For snapshot type analysis, the granularity column is always exact, because the snapshot table does not include a timestamp. exact indicates that the statistics shown in the table correspond to the exact time that the analysis was run.
    • The table shows statistics for each value of each slice key, and for the table as whole. Statistics for the table as a whole are indicated by slice_key = slice_value = null.
    • The window column shows the time that the analysis was run for primary table statistics. For baseline table statistics, the window column shows null.
    • Some statistics are calculated based on the table as a whole, not on a single column. In the column column_name, these statistics are identified by :table.
    # Display profile metrics table
    profile_table = f"{TABLE_NAME}_profile_metrics"
    display(spark.sql(f"SELECT * FROM {profile_table}"))

    Orientation to the drift metrics table

    The drift metrics table has the suffix _drift_metrics. For a list of statistics that are shown in the table, see the documentation (AWS | Azure | GCP).

    • For every column in the primary table, the drift table shows a set of metrics that compare the current values in the table to the values at the time of the previous analysis run and to the baseline table. The column drift_type shows BASELINE to indicate drift relative to the baseline table, and CONSECUTIVE to indicate drift relative to the previous analysis. As in the profile table, the column from the primary table is identified in the column column_name.
    • For snapshot type analysis, the granularity column is always exact, because the snapshot table does not include a timestamp. exact indicates that the statistics shown in the table correspond to the exact time that the analysis was run.
    • The table shows statistics for each value of each slice key, and for the table as whole. Statistics for the table as a whole are indicated by slice_key = slice_value = null.
    • The window column shows the time that the analysis was run. The window_cmp column shows the time that the current analysis is being compared to. If the comparison is to the baseline table, window_cmp is null.
    • Some statistics are calculated based on the table as a whole, not on a single column. In the column column_name, these statistics are identified by :table.
    # Display the drift metrics table
    drift_table = f"{TABLE_NAME}_drift_metrics"
    display(spark.sql(f"SELECT * FROM {drift_table}"))

    4. Refresh metrics after changes to the table data or schema

    4.1 Change some values in the table

    The command in the following cell changes some values in the table. For any row where the value of the alcohol column is less than 9.0, the value is replaced by null.

    spark.sql(f"UPDATE {TABLE_NAME} SET alcohol = null WHERE alcohol < 9")

    Refresh metrics

    run_info = w.quality_monitors.run_refresh(table_name=TABLE_NAME)
    while run_info.state in (MonitorRefreshInfoState.PENDING, MonitorRefreshInfoState.RUNNING):
      run_info = w.quality_monitors.get_refresh(table_name=TABLE_NAME, refresh_id=run_info.refresh_id)
      time.sleep(30)
    
    assert(run_info.state == MonitorRefreshInfoState.SUCCESS)

    Open the monitoring dashboard to notice the changes.

    w.quality_monitors.get(table_name=TABLE_NAME)

    5. [Optional] Delete the monitor

    Uncomment the following line of code to clean up the monitor. Only a single monitor can exist for a table.

    # w.quality_monitors.delete(table_name=TABLE_NAME)
    ;