Pular para o conteúdo principal

LakeFlow Evento declarativo do pipeline log

O evento de pipeline declarativo LakeFlow log contém todas as informações relacionadas a um pipeline, incluindo auditoria logs, verificações de qualidade de dados, progresso de pipeline e linhagem de dados. É possível utilizar o evento “ log ” para acompanhar, compreender e monitorar o estado do seu pipeline de dados.

É possível obter informações sobre eventos view e entradas log na interface de usuário do pipeline declarativo LakeFlow, no pipeline declarativo LakeFlow API, ou consultando diretamente o evento log. Esta seção concentra-se em consultar o log de eventos diretamente.

Também é possível definir ações personalizadas para serem executadas quando eventos são registrados, por exemplo, enviar alertas, com ganchos de eventos.

important

Não exclua o log de eventos, o catálogo pai ou o esquema onde o log de eventos está publicado. A exclusão do evento “ log ” pode resultar na falha da atualização do “ pipeline ” durante execuções futuras.

Para obter detalhes completos sobre o esquema do evento log, consulte LakeFlow Esquema do evento de pipeline declarativo log.

Consultar o log de eventos

nota

Esta seção descreve o comportamento e a sintaxe do default para trabalhar com eventos logs para pipeline configurado com Unity Catalog e o modo de publicação default.

Por default, LakeFlow O pipeline declarativo grava o evento log em uma tabela oculta Delta no catálogo default e no esquema configurado para pipeline. Embora oculta, a tabela ainda pode ser consultada por todos os usuários suficientemente privilegiados. defaultPor padrão, apenas o proprietário de pipeline pode consultar a tabela de eventos log.

Para consultar o log de eventos como proprietário, utilize o ID do pipeline:

SQL
SELECT * FROM event_log(<pipelineId>);

Por default, o nome do evento oculto log é formatado como event_log_{pipeline_id}, onde o ID pipeline é o UUID atribuído pelo sistema com os traços substituídos por sublinhado.

É possível publicar o log de eventos editando as configurações avançadas do pipeline e, em seguida, selecionando Publicar log de eventos no metastore . Para obter detalhes, consulte Configuração do pipeline para o log de eventos. Ao publicar um log de eventos, é necessário especificar o nome do log de eventos e, opcionalmente, um catálogo e um esquema, conforme mostrado no exemplo a seguir:

JSON
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}

O local do log de eventos também serve como local do esquema para quaisquer consultas do Auto Loader no pipeline. Databricks Recomenda-se criar uma restrição de acesso ( view ) sobre a tabela de eventos ( log ) antes de modificar os privilégios, pois algumas configurações de restrição de acesso ( compute ) podem permitir que os usuários obtenham acesso aos metadados do esquema se a tabela de eventos ( log ) for compartilhada diretamente. A sintaxe de exemplo a seguir cria uma coluna de tipo " view " em uma tabela de eventos " log " e é utilizada nas consultas de exemplo " log " incluídas neste artigo. Substitua <catalog_name>.<schema_name>.<event_log_table_name> pelo nome completo da tabela do log de eventos do seu pipeline. Se você publicou o log de eventos, utilize o nome especificado durante a publicação. Caso contrário, utilize event_log(<pipelineId>), onde pipelineId é o ID do pipeline que você deseja consultar.

SQL
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

Em Unity Catalog, visualize as consultas de suporte à transmissão. O exemplo a seguir utiliza transmissão estruturada para consultar uma tabela de eventos ( view ) definida no topo de uma tabela de eventos ( log ):

Python
df = spark.readStream.table("event_log_raw")

O proprietário do pipeline pode publicar o log de eventos como uma tabela Delta pública, alternando a opção “ Publish event log to metastore ” (Exibir log de eventos) na seção “Advanced” (Avançado ) da configuração do pipeline. Opcionalmente, é possível especificar um novo nome de tabela, catálogo e esquema para o log de eventos.

Exemplos de consultas básicas

Os exemplos a seguir mostram como consultar o evento log para obter informações gerais sobre o pipeline e auxiliar na depuração de cenários comuns.

Monitorar atualizações do pipeline consultando atualizações anteriores

O exemplo a seguir consulta as atualizações (ou execução ) de seu pipeline, mostrando o ID da atualização, status, hora de início, hora de conclusão e duração. Isso fornece uma visão geral da execução do pipeline.

Presume-se que o usuário criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Depurar problemas de materialização de view incremental refresh

Este exemplo consulta todos os fluxos da atualização mais recente de um pipeline. Mostra se foram atualizados incrementalmente ou não, bem como outras informações de planejamento relevantes que são úteis para depuração, caso um refresh incremental não ocorra.

Presume-se que o usuário criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Consultar o custo de uma atualização do pipeline

Este exemplo demonstra como consultar o uso do DBU para um pipeline, bem como o usuário para uma determinada execução do pipeline.

SQL
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;

Consultas avançadas

Os exemplos a seguir mostram como consultar o log de eventos para lidar com cenários menos comuns ou mais avançados.

Consultar métricas para todos os fluxos em um pipeline

Este exemplo demonstra como consultar informações detalhadas sobre cada fluxo em um fluxograma ( pipeline). Ele exibe o nome do fluxo, a duração da atualização, as métricas de qualidade dos dados e informações sobre as linhas processadas (linhas de saída, registros excluídos, atualizados e descartados).

Presume-se que o usuário criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array

FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations

FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,

af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Consultar métricas de qualidade ou expectativas dos dados

Se definir expectativas para conjuntos de dados no seu pipeline, as métricas para o número de registros que passaram e falharam em uma expectativa serão armazenadas no objeto details:flow_progress.data_quality.expectations. As métricas para o número de registros descartados são armazenadas no objeto details:flow_progress.data_quality. Os eventos que contêm informações sobre a qualidade dos dados têm o tipo de evento “ flow_progress”.

As métricas de qualidade dos dados podem não estar disponíveis para alguns conjuntos de dados. Veja as limitações das expectativas.

As seguintes métricas de qualidade dos dados estão disponíveis:

Métrica

Descrição

dropped_records

O número de registros que foram descartados porque falharam em uma ou mais expectativas.

passed_records

O número de registros que atenderam aos critérios de expectativa.

failed_records

O número de registros que falharam nos critérios de expectativa.

O exemplo a seguir consulta as métricas de qualidade dos dados para a última atualização do pipeline. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;

Consultar informações de linhagem

Eventos que contêm informações sobre linhagem têm o tipo de evento “ flow_definition”. O objeto " details:flow_definition " contém os objetos " output_dataset " e " input_datasets ", que definem cada relação no gráfico.

Utilize a consulta a seguir para extrair os conjuntos de dados de entrada e saída para visualizar as informações de linhagem. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Monitorar a ingestão de arquivos na nuvem com o Auto Loader

LakeFlow O pipeline declarativo gera eventos quando um Auto Loader a processa arquivos. Para eventos do Auto Loader, o event_type é operation_progress e o details:operation_progress:type é AUTO_LOADER_LISTING ou AUTO_LOADER_BACKFILL. O objeto details:operation_progress também inclui os campos status, duration_ms, auto_loader_details:source_path e auto_loader_details:num_files_listed.

O exemplo a seguir consulta os eventos do Auto Loader para obter a atualização mais recente. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Monitorar o acúmulo de dados para otimizar a duração da transmissão

LakeFlow O pipeline declarativo rastreia a quantidade de dados presentes no backlog no objeto " details:flow_progress.metrics.backlog_bytes ". Os eventos que contêm métricas de backlog têm o tipo de evento flow_progress. O exemplo a seguir consulta as métricas do backlog para a última atualização do pipeline. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
nota

As métricas de backlog podem não estar disponíveis dependendo do tipo de fonte de dados do pipeline e da versão do Databricks Runtime.

Monitorar eventos de autoescala para otimizar o clássico compute

Para um pipeline declarativo do LakeFlow que utiliza o controle de cluster clássico ( compute , ou seja, não utiliza o controle de cluster avançado ( serverless compute)), o evento log captura as redimensionamentos do clustering quando o autoscale aprimorado está habilitado no seu pipeline. Os eventos que contêm informações sobre o autoscale aprimorado têm o tipo de evento “ autoscale”. A solicitação de redimensionamento do agrupamento é armazenada no objeto details:autoscale.

O exemplo a seguir consulta as solicitações de redimensionamento de clustering com autoescala aprimorado para a última atualização do pipeline. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id

Monitorar a utilização de recursos do compute para clássico compute

cluster_resources Os eventos fornecem métricas sobre o número de slots de tarefas no agrupamento, o quanto esses slots de tarefas são utilizados e quantas tarefas estão aguardando para serem agendadas.

Quando o autoscale aprimorado está ativado, os eventos cluster_resources também contêm métricas para o algoritmo de autoscale, incluindo latest_requested_num_executors e optimal_num_executors. Os eventos também mostram o status do algoritmo em diferentes estados, como CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS e BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Essas informações podem ser visualizadas em conjunto com os eventos de ajuste automático para fornecer uma visão geral do ajuste automático aprimorado.

O exemplo a seguir consulta o tamanho da fila de tarefas, o histórico de utilização, a contagem de " executor " e outras métricas e status para o autoscale na última atualização do " pipeline ". Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;

Monitor pipeline transmissão métricas

info

Visualização

Esse recurso está em Public Preview.

O senhor pode view métricas sobre o progresso da transmissão para LakeFlow Declarative pipeline. Consulta a eventos stream_progress para obter eventos muito semelhantes às métricas do StreamingQueryListener criadas pela transmissão estruturada, com as seguintes exceções:

  • As seguintes métricas estão presentes em StreamingQueryListener, mas não em stream_progress: numInputRows, inputRowsPerSecond, e processedRowsPerSecond.
  • Para a transmissão Kafka e Kineses, os campos startOffset, endOffset e latestOffset podem ser muito grandes e são truncados. Para cada um desses campos, um campo ...Truncated adicional, startOffsetTruncated, endOffsetTruncated e latestOffsetTruncated, é adicionado com um valor Boolean para saber se os dados estão truncados.

Para consultar eventos stream_progress, você pode usar uma consulta como a seguinte:

SQL
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Aqui está um exemplo de um evento, em JSON:

JSON
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}

Este exemplo mostra registros não truncados em uma fonte Kafka, com os campos ...Truncated definidos como false:

JSON
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}

Auditoria de pipeline declarativo do LakeFlow

É possível utilizar LakeFlow para obter registros de eventos declarativos do pipeline log e outros registros de auditoria Databricks logs para obter uma visão completa de como os dados estão sendo atualizados em LakeFlow.

LakeFlow O pipeline declarativo utiliza as credenciais do proprietário do pipeline para executar atualizações. É possível alterar as credenciais utilizadas atualizando o proprietário do pipeline. LakeFlow O pipeline declarativo registra o usuário para ações no pipeline de configuração ( pipeline), incluindo a criação de pipeline, edições na configuração e acionamento de atualizações.

Consulte os eventosUnity Catalog para obter uma referência dos eventos de auditoria Unity Catalog.

Consultar ações do usuário no log de eventos

É possível utilizar o log de eventos para auditar eventos, como ações do usuário. Os eventos que contêm informações sobre as ações do usuário têm o tipo de evento “ user_action”.

As informações sobre a ação são armazenadas no objeto " user_action " no campo " details ". Utilize a consulta a seguir para criar um log de auditoria dos eventos do usuário. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

2021-05-20T 19:36:03.517 +0000

START

user@company.com

2021-05-20T 19:35:59.913 +0000

CREATE

user@company.com

2021-05-27T 00:35:51.971 +0000

START

user@company.com

Runtime Informação

É possível obter informações de tempo de execução do view para uma atualização do pipeline, por exemplo, a versão Databricks Runtime para a atualização. Isso pressupõe que você criou o event_log_raw view para o pipeline no qual está interessado, conforme descrito em Consultar o evento log.

SQL
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

11.0