Monitor pipelines with the Delta Live Tables event log

An event log is created and maintained for every Delta Live Tables pipeline. The event log contains all information related to the pipeline, including audit logs, data quality checks, pipeline progress, and data lineage. You can use the event log to track, understand, and monitor the state of your data pipelines.

The event log for each pipeline is stored in a Delta table in DBFS. You can view event log entries in the Delta Live Tables user interface, the Delta Live Tables API, or by directly querying the Delta table. This article focuses on querying the Delta table.

The example notebook includes queries discussed in this article and can be used to explore the Delta Live Tables event log.

Requirements

The examples in this article use JSON SQL functions available in Databricks Runtime 8.1 or higher.

Event log location

The event log is stored in /system/events under the storage location. For example, if you have configured your pipeline storage setting as /Users/username/data, the event log is stored in the /Users/username/data/system/events path in DBFS.

If you have not configured the storage setting, the default event log location is /pipelines/<pipeline-id>/system/events in DBFS. For example, if the ID of your pipeline is 91de5e48-35ed-11ec-8d3d-0242ac130003, the storage location is /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Event log schema

The following table describes the event log schema. Some of these fields contain JSON documents that require parsing to perform some queries. For example, analyzing data quality metrics requires parsing fields in the details JSON document. The examples in this article demonstrate using Python functions to perform the required parsing.

Field

Description

id

A unique identifier for the pipeline.

sequence

A JSON document containing metadata to identify and order events.

origin

A JSON document containing metadata for the origin of the event, for example, cloud provider, region, user_id, or pipeline_id.

timestamp

The time the event was recorded.

message

A human-readable message describing the event.

level

The event type, for example, INFO, WARN, ERROR, or METRICS.

error

If an error occurred, details describing the error.

details

A JSON document containing structured details of the event. This is the primary field used for analyzing events.

event_type

The event type.

maturity_level

The stability of the event schema. The possible values are:

  • STABLE: The schema is stable and will not change.

  • EVOLVING: The schema is not stable and may change.

  • DEPRECATED: The schema is deprecated and the Delta Live Tables runtime may stop producing this event at any time.

Event log queries

You can create a view to simplify querying the event log. The following example creates a view called event_log_view. This view is used in the following examples that query event log records:

event_log = spark.read.format('delta').load(event_log_path)
event_log.createOrReplaceTempView("event_log_raw")

Replace event_log_path with the event log location.

Each instance of a pipeline run is called an update. Some of the following queries extract information for the most recent update. Run the following commands to find the identifier for the most recent update and save it in the latest_update_id variable:

latest_update_id = spark.sql("SELECT origin.update_id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1").collect()[0].update_id
spark.conf.set('latest_update.id', latest_update_id)

Audit logging

You can use the event log to audit events, for example, user actions. Events containing information about user actions have the event type user_action. Information about the action is stored in the user_action object in the details field. Use the following query to construct an audit log of user events:

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

1

2021-05-20T19:36:03.517+0000

START

user@company.com

2

2021-05-20T19:35:59.913+0000

CREATE

user@company.com

3

2021-05-27T00:35:51.971+0000

START

user@company.com

Lineage

You can see a visual representation of your pipeline graph in the Delta Live Tables user interface. You can also programatically extract this information to perform tasks such as generating reports for compliance or tracking data dependencies across an organization. Events containing information about lineage have the event type flow_definition. The lineage information is stored in the flow_definition object in the details field. The fields in the flow_definition object contain the necessary information to infer the relationships between datasets:

SELECT details:flow_definition.output_dataset, details:flow_definition.input_datasets FROM event_log_raw WHERE event_type = 'flow_definition' AND origin.update_id = '${latest_update.id}'

output_dataset

input_datasets

1

customers

null

2

sales_orders_raw

null

3

sales_orders_cleaned

[“customers”, “sales_orders_raw”]

4

sales_order_in_la

[“sales_orders_cleaned”]

Data quality

The event log captures data quality metrics based on the expectations defined in your pipelines. Events containing information about data quality have the event type flow_progress. When an expectation is defined on a dataset, the data quality metrics are stored in the details field in the flow_progress.data_quality.expectations object. The following example queries the data quality metrics for the last pipeline update:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = '${latest_update.id}'
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name

dataset

expectation

passing_records

failing_records

1

sales_orders_cleaned

valid_order_number

4083

0

Backlog metrics

You can use the event log to query backlog metrics. Events containing information about backlog metrics have the event type flow_progress. Information about backlog metrics is stored in the flow_progress.metrics.backlog_bytes objects in the details field. The following example queries backlog metrics for the last pipeline update:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw
WHERE
  event_type ='flow_progress'
  AND origin.update_id = '${latest_update.id}'

Note

The backlog metrics may not be available depending on the pipeline’s data source type and Databricks Runtime version.

Databricks Enhanced Autoscaling events

The event log captures cluster resizes when Enhanced Autoscaling is enabled in your pipelines. Events containing information about Enhanced Autoscaling have the event type autoscale. The cluster resizing request information is stored in the autoscale object. The following example queries the Enhanced Autoscaling cluster resize requests for the last pipeline update:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw
WHERE
  event_type = 'autoscale'
  AND origin.update_id = '${latest_update.id}'

Runtime information

You can view runtime information for a pipeline update, for example, the Databricks Runtime version for the update:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

1

11.0

Example notebook

Querying the Delta Live Tables event log

Open notebook in new tab