logeventos do pipeline declarativoLakeFlow
O log de eventos declarativo do pipeline LakeFlow contém todas as informações relacionadas a um pipeline, incluindo logs de auditoria, verificações de qualidade de dados, progresso pipeline e linhagem de dados. Você pode usar o log de eventos para rastrear, entender e monitorar o estado do seu pipeline de dados.
Você pode view entradas log de eventos na interface do usuário do pipeline declarativo LakeFlow , na API do pipeline declarativo LakeFlow ou consultando diretamente o log de eventos. Esta seção se concentra na consulta direta do log de eventos.
Você também pode definir ações personalizadas para execução quando eventos são registrados, por exemplo, envio de alertas, com ganchos de eventos.
Não exclua o log de eventos nem o catálogo ou esquema pai onde o log de eventos é publicado. Excluir o log de eventos pode fazer com que seu pipeline não seja atualizado durante execuções futuras.
Para obter detalhes completos sobre o esquema log eventos, consulte Esquema log eventos do pipeline declarativoLakeFlow.
Consultar o log de eventos
Esta seção descreve o comportamento e a sintaxe default para trabalhar com logs de eventos para pipeline configurado com Unity Catalog e o modo de publicação default .
- Para saber o comportamento do pipeline Unity Catalog que usa o modo de publicação legado, consulte Trabalhar com log de eventos para o pipeline do modo de publicação legado Unity Catalog.
- Para comportamento e sintaxe do pipeline Hive metastore , consulte Trabalhar com log de eventos para o pipeline Hive metastore.
Por default, o pipeline declarativo LakeFlow grava o log de eventos em uma tabela Delta oculta no catálogo e esquema default configurados para o pipeline. Embora oculta, a tabela ainda pode ser consultada por todos os usuários com privilégios suficientes. Por default, somente o proprietário do pipeline pode consultar a tabela log eventos.
Para consultar o log de eventos como proprietário, use o ID do pipeline:
SELECT * FROM event_log(<pipelineId>);
Por default, o nome do log de eventos ocultos é formatado como event_log_{pipeline_id}
, onde o ID pipeline é o UUID atribuído pelo sistema com traços substituídos por sublinhado.
Você pode publicar o log de eventos editando as configurações avançadas do seu pipeline. Para obter detalhes, consulte Configuração de pipeline para log de eventos. Ao publicar um log de eventos, especifique o nome do log de eventos e, opcionalmente, especifique um catálogo e um esquema, como no exemplo a seguir:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
O local do log de eventos também serve como local do esquema para quaisquer consultas do Auto Loader no pipeline. Databricks recomenda criar uma view sobre a tabela log de eventos antes de modificar os privilégios, pois algumas configurações compute podem permitir que os usuários obtenham acesso aos metadados do esquema se a tabela log de eventos for compartilhada diretamente. A sintaxe de exemplo a seguir cria uma view em uma tabela log de eventos e é usada nas consultas log eventos de exemplo incluídas neste artigo. Substitua <catalog_name>.<schema_name>.<event_log_table_name>
pelo nome da tabela totalmente qualificado do log de eventos do seu pipeline. Se você publicou o log de eventos, use o nome especificado durante a publicação. Caso contrário, use event_log(<pipelineId>)
onde pipelineId é o ID do pipeline que você deseja consultar.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
No Unity Catalog, visualize consultas de transmissão de suporte. O exemplo a seguir usa transmissão estruturada para consultar uma view definida em cima de uma tabela log eventos:
df = spark.readStream.table("event_log_raw")
Exemplos básicos de consulta
Os exemplos a seguir mostram como consultar o log eventos para obter informações gerais sobre o pipeline e ajudar a depurar cenários comuns.
Monitore atualizações de pipeline consultando atualizações anteriores
O exemplo a seguir consulta as atualizações (ou execução ) do seu pipeline, mostrando o ID da atualização, o status, o horário de início, o horário de conclusão e a duração. Isso lhe dá uma visão geral da execução do pipeline.
Assume que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Depurar problemas refresh incremental view materializada
Este exemplo consulta todos os fluxos da atualização mais recente de um pipeline. Ele mostra se eles foram atualizados incrementalmente ou não, bem como outras informações de planejamento relevantes que são úteis para entender por que uma refresh incremental não acontece.
Assume que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Consultar o custo de uma atualização de pipeline
Este exemplo mostra como consultar o uso de DBU para um pipeline, bem como o usuário para uma determinada execução de pipeline.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Consultas avançadas
Os exemplos a seguir mostram como consultar o log de eventos para lidar com cenários menos comuns ou mais avançados.
Métricas de consulta para todos os fluxos em um pipeline
Este exemplo mostra como consultar informações detalhadas sobre cada fluxo em um pipeline. Ele mostra o nome do fluxo, a duração da atualização, as métricas de qualidade dos dados e informações sobre as linhas processadas (linhas de saída, registros excluídos, inseridos e descartados).
Assume que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Consultar métricas de qualidade de dados ou expectativas
Se você definir expectativas em conjuntos de dados em seu pipeline, as métricas para o número de registros que passaram e falharam em uma expectativa serão armazenadas no objeto details:flow_progress.data_quality.expectations
. A métrica para o número de registros descartados é armazenada no objeto details:flow_progress.data_quality
. Eventos que contêm informações sobre qualidade de dados têm o tipo de evento flow_progress
.
Métricas de qualidade de dados podem não estar disponíveis para alguns conjuntos de dados. Veja as limitações de expectativa.
As seguintes métricas de qualidade de dados estão disponíveis:
Métrica | Descrição |
---|---|
| O número de registros que foram descartados porque não atenderam a uma ou mais expectativas. |
| O número de registros que passaram nos critérios de expectativa. |
| O número de registros que não atenderam aos critérios de expectativa. |
O exemplo a seguir consulta as métricas de qualidade de dados para a última atualização do pipeline. Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Consultar informações de linhagem
Eventos que contêm informações sobre linhagem têm o tipo de evento flow_definition
. O objeto details:flow_definition
contém output_dataset
e input_datasets
que definem cada relacionamento no gráfico.
Use a consulta a seguir para extrair os conjuntos de dados de entrada e saída para ver informações de linhagem. Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Monitore a ingestão de arquivos cloud com Auto Loader
O pipeline declarativo LakeFlow gera eventos quando Auto Loader processa arquivos. Para eventos do Auto Loader, o event_type
é operation_progress
e o details:operation_progress:type
é AUTO_LOADER_LISTING
ou AUTO_LOADER_BACKFILL
. O objeto details:operation_progress
também inclui os campos status
, duration_ms
, auto_loader_details:source_path
e auto_loader_details:num_files_listed
.
O exemplo a seguir consulta eventos do Auto Loader para obter a atualização mais recente. Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Monitorar o backlog de dados para otimizar a transmissão duração
O pipeline declarativo LakeFlow rastreia a quantidade de dados presentes no backlog no objeto details:flow_progress.metrics.backlog_bytes
. Eventos que contêm métricas de backlog têm o tipo de evento flow_progress
. O exemplo a seguir consulta métricas de backlog para a última atualização do pipeline. Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
As métricas de backlog podem não estar disponíveis dependendo do tipo de fonte de dados do pipeline e da versão Databricks Runtime .
Monitore eventos de dimensionamento automático para otimizar computeclássica
Para o pipeline declarativo LakeFlow que usa compute clássica (em outras palavras, não usa compute serverless ), o log de eventos captura redimensionamentos cluster quando o dimensionamento automático aprimorado está habilitado no seu pipeline. Eventos que contêm informações sobre dimensionamento automático aprimorado têm o tipo de evento autoscale
. As informações da solicitação de redimensionamento cluster são armazenadas no objeto details:autoscale
.
O exemplo a seguir consulta as solicitações de redimensionamento cluster de dimensionamento automático aprimorado para a última atualização pipeline . Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Monitorar a utilização de recursos de compute para computeclássica
cluster_resources
Os eventos fornecem métricas sobre o número de slots de tarefas no cluster, o quanto esses slots de tarefas são usados e quantas tarefas estão aguardando para serem agendadas.
Quando o dimensionamento automático aprimorado está habilitado, os eventos cluster_resources
também contêm métricas para o algoritmo de dimensionamento automático, incluindo latest_requested_num_executors
e optimal_num_executors
. Os eventos também mostram o status do algoritmo como estados diferentes, como CLUSTER_AT_DESIRED_SIZE
, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
e BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
.
Essas informações podem ser visualizadas em conjunto com os eventos de dimensionamento automático para fornecer uma visão geral do dimensionamento automático aprimorado.
O exemplo a seguir consulta a tarefa queue size história, utilização história, executor count história e outras métricas e estado para autoescala na última atualização pipeline . Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Monitorar medições de transmissão de pipeline
Visualização
Este recurso está em Visualização Pública.
Você pode view métricas sobre o progresso da sua transmissão para o pipeline declarativo LakeFlow . Consulte eventos stream_progress para obter eventos muito semelhantes às métricas StreamingQueryListener criadas pela transmissão estructurada, com as seguintes exceções:
- As seguintes métricas estão presentes em
StreamingQueryListener
, mas não emstream_progress
:numInputRows
,inputRowsPerSecond
eprocessedRowsPerSecond
. - Para transmissão de Kafka e Kineses, os campos
startOffset
,endOffset
elatestOffset
podem ser muito grandes e são truncados. Para cada um desses campos, um campo...Truncated
adicional,startOffsetTruncated
,endOffsetTruncated
elatestOffsetTruncated
, é adicionado com um valor Boolean para indicar se os dados serão truncados.
Para consultar eventos stream_progress
, você pode usar uma consulta como a seguinte:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Aqui está um exemplo de um evento, em JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Este exemplo mostra registros não truncados em uma fonte Kafka, com os campos ...Truncated
definidos como false
:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Auditar pipeline declarativo LakeFlow
Você pode usar registros log eventos do pipeline declarativo LakeFlow e outros logs de auditoria Databricks para obter uma visão completa de como os dados estão sendo atualizados no pipeline declarativo LakeFlow .
O pipeline declarativo LakeFlow usa as credenciais do proprietário pipeline para executar atualizações. Você pode alterar as credenciais usadas atualizando o proprietário do pipeline. O pipeline declarativo LakeFlow registra as ações do usuário no pipeline, incluindo criação de pipeline , edições na configuração e acionamento de atualizações.
Consulte Eventos do Unity Catalog para obter uma referência de eventos de auditoria do Unity Catalog.
Consultar ações do usuário no log de eventos
Você pode usar o log de eventos para auditar eventos, por exemplo, ações do usuário. Eventos que contêm informações sobre ações do usuário têm o tipo de evento user_action
.
informações sobre a ação são armazenadas no objeto user_action
no campo details
. Use a consulta a seguir para construir um log de auditoria de eventos do usuário. Isso pressupõe que você criou a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
|
|
|
---|---|---|
2021-05-20T19:36:03.517+0000 |
|
|
2021-05-20T19:35:59.913+0000 |
|
|
2021-05-27T00:35:51.971+0000 |
|
|
InformaçõesRuntime
Você pode view informações de tempo de execução para uma atualização pipeline , por exemplo, a versão Databricks Runtime para a atualização. Isso pressupõe que você tenha criado a view event_log_raw
para o pipeline no qual está interessado, conforme descrito em Consultar o logde eventos.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
|
---|
11.0 |