Monitor pipelines with the Delta Live Tables event log
An event log is created and maintained for every Delta Live Tables pipeline. The event log contains all information related to the pipeline, including audit logs, data quality checks, pipeline progress, and data lineage. You can use the event log to track, understand, and monitor the state of your data pipelines.
The event log for each pipeline is stored in a Delta table in DBFS. You can view event log entries in the Delta Live Tables user interface, the Delta Live Tables API, or by directly querying the Delta table. This article focuses on querying the Delta table.
The example notebook includes queries discussed in this article and can be used to explore the Delta Live Tables event log.
Requirements
The examples in this article use JSON SQL functions available in Databricks Runtime 8.1 or higher.
Event log location
The event log is stored in /system/events
under the storage
location. For example, if you have configured your pipeline storage
setting as /Users/username/data
, the event log is stored in the /Users/username/data/system/events
path in DBFS.
If you have not configured the storage
setting, the default event log location is /pipelines/<pipeline-id>/system/events
in DBFS. For example, if the ID of your pipeline is 91de5e48-35ed-11ec-8d3d-0242ac130003
, the storage location is /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events
.
Event log schema
The following table describes the event log schema. Some of these fields contain JSON documents that require parsing to perform some queries. For example, analyzing data quality metrics requires parsing fields in the details
JSON document. The examples in this article demonstrate using Python functions to perform the required parsing.
Field |
Description |
---|---|
id |
A unique identifier for the event log record. |
sequence |
A JSON document containing metadata to identify and order events. |
origin |
A JSON document containing metadata for the origin of the event, for example, cloud provider, region, user_id, or pipeline_id. |
timestamp |
The time the event was recorded. |
message |
A human-readable message describing the event. |
level |
The event type, for example, INFO, WARN, ERROR, or METRICS. |
error |
If an error occurred, details describing the error. |
details |
A JSON document containing structured details of the event. This is the primary field used for analyzing events. |
event_type |
The event type. |
maturity_level |
The stability of the event schema. The possible values are:
|
Event log queries
You can create a view to simplify querying the event log. The following example creates a view called event_log_view
. This view is used in the following examples that query event log records:
event_log = spark.read.format('delta').load(event_log_path)
event_log.createOrReplaceTempView("event_log_raw")
Replace event_log_path
with the event log location.
Each instance of a pipeline run is called an update. Some of the following queries extract information for the most recent update. Run the following commands to find the identifier for the most recent update and save it in the latest_update_id
variable:
latest_update_id = spark.sql("SELECT origin.update_id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1").collect()[0].update_id
spark.conf.set('latest_update.id', latest_update_id)
Audit logging
You can use the event log to audit events, for example, user actions. Events containing information about user actions have the event type user_action
. Information about the action is stored in the user_action
object in the details
field. Use the following query to construct an audit log of user events:
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|
---|---|---|---|
1 |
2021-05-20T19:36:03.517+0000 |
START |
|
2 |
2021-05-20T19:35:59.913+0000 |
CREATE |
|
3 |
2021-05-27T00:35:51.971+0000 |
START |
|
Lineage
You can see a visual representation of your pipeline graph in the Delta Live Tables user interface. You can also programatically extract this information to perform tasks such as generating reports for compliance or tracking data dependencies across an organization. Events containing information about lineage have the event type flow_definition
. The lineage information is stored in the flow_definition
object in the details
field. The fields in the flow_definition
object contain the necessary information to infer the relationships between datasets:
SELECT details:flow_definition.output_dataset, details:flow_definition.input_datasets FROM event_log_raw WHERE event_type = 'flow_definition' AND origin.update_id = '${latest_update.id}'
output_dataset |
input_datasets |
|
---|---|---|
1 |
customers |
null |
2 |
sales_orders_raw |
null |
3 |
sales_orders_cleaned |
[“customers”, “sales_orders_raw”] |
4 |
sales_order_in_la |
[“sales_orders_cleaned”] |
Data quality
The event log captures data quality metrics based on the expectations defined in your pipelines. Events containing information about data quality have the event type flow_progress
. When an expectation is defined on a dataset, the data quality metrics are stored in the details field in the flow_progress.data_quality.expectations
object. The following example queries the data quality metrics for the last pipeline update:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw
WHERE
event_type = 'flow_progress'
AND origin.update_id = '${latest_update.id}'
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
|
---|---|---|---|---|
1 |
sales_orders_cleaned |
valid_order_number |
4083 |
0 |
Backlog metrics
You can use the event log to query backlog metrics. Events containing information about backlog metrics have the event type flow_progress
. Information about backlog metrics is stored in the flow_progress.metrics.backlog_bytes
objects in the details
field. The following example queries backlog metrics for the last pipeline update:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw
WHERE
event_type ='flow_progress'
AND origin.update_id = '${latest_update.id}'
Note
The backlog metrics may not be available depending on the pipeline’s data source type and Databricks Runtime version.
Databricks Enhanced Autoscaling events
The event log captures cluster resizes when Enhanced Autoscaling is enabled in your pipelines. Events containing information about Enhanced Autoscaling have the event type autoscale
. The cluster resizing request information is stored in the autoscale
object. The following example queries the Enhanced Autoscaling cluster resize requests for the last pipeline update:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw
WHERE
event_type = 'autoscale'
AND origin.update_id = '${latest_update.id}'
Runtime information
You can view runtime information for a pipeline update, for example, the Databricks Runtime version for the update:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|
---|---|
1 |
11.0 |