Monitorar pipelines Delta Live Tables

Este artigo descreve como você pode usar recursos integrados no Delta Live Tables para monitoramento e observabilidade de pipelines, incluindo linhagem de dados, histórico de atualização e relatórios de qualidade de dados.

Você pode revisar a maioria dos dados de monitoramento manualmente por meio da IU de detalhes do pipeline. Algumas tarefas são mais fáceis de realizar consultando os metadados logs de eventos. Consulte O que são os logs de eventos do Delta Live Tables?.

Quais detalhes do pipeline estão disponíveis na IU?

O gráfico do pipeline é exibido assim que uma atualização de um pipeline é iniciada com sucesso. As setas representam dependências entre dataset no seu pipeline. Por default, a página de detalhes do pipeline mostra a atualização mais recente da tabela, mas você pode selecionar atualizações mais antigas em um menu suspenso.

Os detalhes exibidos incluem o ID do pipeline, a biblioteca de origem, o custo do compute, a edição do produto e o canal configurado para o pipeline.

Para ver uma tabular view do dataset, clique na Lista tab. A de lista view permite que você veja todos os dataset em seu pipeline representados como uma linha em uma tabela e é útil quando o DAG do pipeline é muito grande para ser visualizado na de gráfico view. Você pode controlar o dataset exibido na tabela usando vários filtros, como nome, tipo e status do dataset . Para voltar à visualização do DAG, clique em gráficos.

A execução como usuário é o proprietário do pipeline e o pipeline atualiza a execução com as permissões desse usuário. Para alterar o usuário run as , clique em Permissões e altere o proprietário do pipeline.

Como você pode visualizar os detalhes do conjunto de dados?

Clicar em um dataset no gráfico do pipeline ou na lista dataset exibe detalhes sobre o dataset. Os detalhes incluem o esquema dataset , métricas de qualidade de dados e um link para o código-fonte que define o dataset.

Ver histórico de atualização

Para view a história e o status das atualizações do pipeline, clique no menu suspenso atualizar história na barra superior.

Para view o gráfico, os detalhes e os eventos de uma atualização, selecione a atualização no menu suspenso. Para retornar à atualização mais recente, clique em Mostrar a atualização mais recente.

Receba notificações para eventos de pipeline

Para receber notificações em tempo real para eventos de pipeline, como conclusão bem-sucedida de uma atualização de pipeline ou falha de uma atualização de pipeline, adicione Adicionar notificações por email para eventos de pipeline ao criar ou editar um pipeline.

O que são os logs de eventos do Delta Live Tables?

O log de eventos Delta Live Tables contém todas as informações relacionadas a um pipeline, incluindo logs de auditoria, verificações de qualidade de dados, progresso do pipeline e linhagem de dados. Você pode usar os logs de eventos para rastrear, entender e monitorar o estado de seu pipeline de dados.

Você pode view entradas logs de eventos na interface do usuário Delta Live Tables, na API Delta Live Tables ou consultando diretamente os logs de eventos. Este artigo se concentra em consultar os logs de eventos diretamente.

Você também pode definir ações customizadas para execução quando eventos forem logs, por exemplo, envio de alerta, com event hooks.

Esquema logs de eventos

A tabela a seguir descreve o esquema logs de eventos. Alguns desses campos contêm dados JSON que requerem análise para realizar alguma query, como o campo details. Databricks oferece suporte ao operador : para analisar campos JSON. Veja : (sinal de dois pontos) operador.

Campo

Descrição

id

Um identificador exclusivo para o registro logs de eventos.

sequence

Um documento JSON contendo metadados para identificar e ordenar eventos.

origin

Um documento JSON contendo metadados para a origem do evento, por exemplo, o provedor clouds , a região do provedor clouds , user_id, pipeline_id ou pipeline_type para mostrar onde o pipeline foi criado, DBSQL ou WORKSPACE.

timestamp

A hora em que o evento foi registrado.

message

Uma mensagem legível por humanos descrevendo o evento.

level

O tipo de evento, por exemplo, INFO, WARN, ERROR ou METRICS.

error

Se ocorreu um erro, detalhes descrevendo o erro.

details

Um documento JSON contendo detalhes estruturados do evento. Este é o campo principal usado para analisar eventos.

event_type

O tipo de evento.

maturity_level

A estabilidade do esquema de eventos. Os valores possíveis são:

  • STABLE: O esquema é estável e não será alterado.

  • NULL: O esquema é estável e não será alterado. O valor pode ser NULL se o registro foi criado antes de o campo maturity_level ser adicionado (versão 2022.37).

  • EVOLVING: O esquema não é estável e pode mudar.

  • DEPRECATED: o esquema está obsoleto e o Delta Live Tables Runtime pode parar de produzir este evento a qualquer momento.

Consultando os logsde eventos

A localização dos logs de eventos e a interface para query os logs de eventos dependem se o seu pipeline está configurado para usar o Hive metastore ou Unity Catalogs.

Hive metastore

Se seu pipeline publicar tabelas no Hive metastore, os logs de eventos serão armazenados em /system/events no local storage. Por exemplo, se você configurou o pipeline storage como /Users/username/data, os logs de eventos são armazenados no caminho /Users/username/data/system/events no DBFS.

Se você não configurou a configuração storage, o local default logs de eventos é /pipelines/<pipeline-id>/system/events no DBFS. Por exemplo, se o ID do pipeline for 91de5e48-35ed-11ec-8d3d-0242ac130003, o local de armazenamento será /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Você pode criar uma view para simplificar a consulta dos logs de eventos. O exemplo a seguir cria uma view temporária chamada event_log_raw. Esta view é usada no exemplo query logs de eventos incluído nestes artigos:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Substitua <event-log-path> pelo local logs de eventos.

Cada instância de uma execução de pipeline é chamada de atualização. Muitas vezes você deseja extrair informações para a atualização mais recente. execute a query a seguir para localizar o identificador da atualização mais recente e salve-o na view temporária latest_update_id . Esta view é usada no exemplo query logs de eventos incluído nestes artigos:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Você pode query os logs in um Databricks Notebook ou no editor SQL. Use um Notebook ou o editor SQL para executar a query logs eventos de exemplo.

Unity Catalog

Se seu pipeline publica tabelas para Unity Catalogs, você deve usar a função de valor de tabela event_log (TVF) para buscar os logs de eventos para o pipeline. Você recupera os logs de eventos de um pipeline passando a ID do pipeline ou um nome de tabela para o TVF. Por exemplo, para recuperar os registros logs de eventos para o pipeline com ID 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Para recuperar os registros logs de eventos para o pipeline que criou ou possui a tabela my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Para chamar o TVF, você deve usar clusters compartilhados ou um armazém SQL. Por exemplo, você pode usar um Notebook anexado a clusters compartilhados ou usar o editor SQL conectado a um SQL warehouse.

Para simplificar a consulta de eventos para um pipeline, o proprietário do pipeline pode criar uma view sobre o event_log TVF. O exemplo a seguir cria uma view dos logs de eventos de um pipeline. Essa view é usada na query logs de eventos de exemplo incluída neste artigo.

Observação

O TVF event_log pode ser chamado apenas pelo proprietário do pipeline e uma view criada sobre o TVF event_log pode ser query apenas pelo proprietário do pipeline. A view não pode ser compartilhada com outros usuários.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Substitua <pipeline-ID> pelo identificador exclusivo do pipeline Delta Live Tables. Você pode encontrar o ID no painel de detalhes do pipeline na IU do Delta Live Tables.

Cada instância de uma execução de pipeline é chamada de atualização. Muitas vezes você deseja extrair informações para a atualização mais recente. execute a query a seguir para localizar o identificador da atualização mais recente e salve-o na view temporária latest_update_id . Esta view é usada no exemplo query logs de eventos incluído nestes artigos:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Consultar informações de linhagem dos logsde eventos

Eventos contendo informações sobre linhagem possuem o tipo de evento flow_definition. O objeto details:flow_definition contém o output_dataset e input_datasets que definem cada relacionamento no gráfico.

Você pode usar a seguinte query para extrair o dataset de entrada e saída para ver a informação de linhagem:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id

output_dataset

input_datasets

1

customers

null

2

sales_orders_raw

null

3

sales_orders_cleaned

["customers", "sales_orders_raw"]

4

sales_order_in_la

["sales_orders_cleaned"]

Consultar a qualidade dos dados dos logsde eventos

Se você definir expectativas no dataset em seu pipeline, as métricas de qualidade de dados serão armazenadas no objeto details:flow_progress.data_quality.expectations. Eventos contendo informações sobre qualidade de dados possuem o tipo de evento flow_progress. O exemplo a seguir query as métricas de qualidade de dados para a última atualização do pipeline:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name

dataset

expectation

passing_records

failing_records

1

sales_orders_cleaned

valid_order_number

4083

0

Monitorelogs de dados de volta consultando os logsde eventos

Delta Live Tables rastreia quantos dados estão presentes no backlog no objeto details:flow_progress.metrics.backlog_bytes . Os eventos que contêm métricas de backlog têm o tipo de evento flow_progress. O exemplo a seguir query métricas de backlog para a última atualização do pipeline:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Observação

As métricas de backlog podem não estar disponíveis dependendo do tipo de fonte de dados do pipeline e da versão do Databricks Runtime.

Monitore eventos de autoscale aprimorado dos logsde eventos

Os logs de eventos capturam redimensionamentos clusters quando o autoscale aprimorado está habilitado em seus pipelines. Os eventos que contêm informações sobre autoscale aprimorada têm o tipo de evento autoscale. A informação da solicitação de redimensionamento clusters é armazenada no objeto details:autoscale. O exemplo a seguir query as solicitações de redimensionamento clusters autoscale automático aprimorado para a última atualização do pipeline:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Monitore a utilização de recursos de computação

cluster_resources os eventos fornecem métricas sobre o número de slots de tarefa nos clusters, quanto esses slots de tarefa são utilizados e quantas tarefas estão esperando para serem agendadas.

Quando autoscale aprimorado está ativado, os eventos cluster_resources também contêm métricas para o algoritmo autoscale , incluindo latest_requested_num_executors e optimal_num_executors. Os eventos também mostram o status do algoritmo como estados diferentes, como CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS e BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Esta informação pode ser view em conjunto com os eventos autoscale para fornecer uma visão geral da autoscale aprimorada.

O exemplo a seguir query o histórico do tamanho da fila de tarefas para a última atualização do pipeline:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

O exemplo a seguir query o histórico de utilização para a última atualização do pipeline:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

O exemplo a seguir query o histórico de contagem do executor, acompanhado de métricas disponíveis apenas para pipelines de autoscale aprimorado, incluindo o número de executores solicitados pelo algoritmo na solicitação mais recente, o número ideal de executores recomendado pelo algoritmo com base nas métricas mais recentes e o estado do algoritmo autoscale :

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Auditar pipelines Delta Live Tables

Você pode usar os registros de log de eventos do Delta Live Tables e outros logs de auditoria do Databricks para obter uma visão completa de como os dados estão sendo atualizados no Delta Live Tables.

Delta Live Tables usa as credenciais do proprietário do pipeline para atualizações de execução. Você pode alterar as credenciais usadas atualizando o proprietário do pipeline. Delta Live Tables registra o usuário para ações no pipeline, incluindo criação de pipeline, edições na configuração e atualizações de acionamento.

Consulte EventosUnity Catalog para obter uma referência dos eventos de auditoria Unity Catalog .

Consultar ações do usuário nos logsde eventos

Você pode usar os logs de eventos para auditar eventos, por exemplo, ações do usuário. Eventos contendo informações sobre ações do usuário possuem o tipo de evento user_action.

informações sobre a ação são armazenadas no objeto user_action no campo details . Use a query a seguir para construir logs de auditoria de eventos do usuário. Para criar a view event_log_raw usada nesta consulta, consulte Consultando os logsde eventos.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'

timestamp

action

user_name

1

2021-05-20T19:36:03.517+0000

START

user@company.com

2

2021-05-20T19:35:59.913+0000

CREATE

user@company.com

3

2021-05-27T00:35:51.971+0000

START

user@company.com

Informação Runtime

Você pode view informações de tempo de execução para uma atualização de pipeline, por exemplo, a versão do Databricks Runtime para a atualização:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'

dbr_version

1

11,0