Pular para o conteúdo principal

Monitorar e observar o Auto Loader

Os pipelines do Auto Loader exigem monitoramento ativo para detectar problemas como backlogs crescentes, desvio de esquema, dados corrompidos e transmissões paralisadas antes que afetem os consumidores downstream. Esta página descreve como monitorar key métricas, consultar o estado no nível do arquivo, criar painéis de observabilidade e solucionar problemas comuns.

Para obter detalhes da configuração de produção, consulte Configurar o Auto Loader para cargas de trabalho de produção.

Pré-requisitos

Vários fluxos de trabalho de monitoramento nesta página dependem de cloud_files_state() para observar o estado de ingestão por arquivo — incluindo consultas de backlog, cálculos de latência e detecção de desvio de esquema. cloud_files_state() é uma função com valor de tabela que retorna o estado de ingestão em nível de arquivo para um checkpoint do Auto Loader. Nem todos os seus campos estão disponíveis por default. A disponibilidade depende da sua versão e configuração do Databricks Runtime:

  • Databricks Runtime 18.2 e acima : discovery_time, processed_time e commit_time estão disponíveis automaticamente. No Databricks Runtime 16.4–18.1, esses campos estão disponíveis somente quando cloudFiles.cleanSource estiver ativado.
  • **Databricks Runtime 16.4 e acima com cloudFiles.cleanSource archive_timehabilitado**:, archive_mode e move_location estão disponíveis.

A habilitação de cloudFiles.cleanSource tem alguma sobrecarga de desempenho. Compare com suas cargas de trabalho em um ambiente de pré-produção antes de habilitá-lo em produção.

Além disso:

  • Anote os dados ingeridos com a coluna _metadata. Capture no mínimo file_path e file_modification_time. Consulte Coluna de metadados de arquivo.
  • Habilitar as colunas _rescued_data e _corrupt_record.

Métricas key do Auto Loader

A tabela a seguir resume as métricas mais importantes a monitorar para pipelines do Auto Loader. Estas métricas estão disponíveis a partir de eventos de progresso StreamingQueryListener, com valores específicos do Auto Loader expostos no mapa metrics de cada fonte.

Métrica

O que informa

numFilesOutstanding

Número de arquivos na lista de pendências aguardando para serem processados

numBytesOutstanding

Tamanho do backlog de arquivos em bytes

approximateQueueSize

Profundidade da fila na cloud (somente modo de notificação de arquivo)

numInputRows

Linhas processadas por lote

inputRowsPerSecond

Taxa de chegada de dados

processedRowsPerSecond

Taxa de transferência de processamento

durationMs Detalhamento

Onde o tempo é gasto em cada lote

O que observar

Os seguintes padrões indicam que seu pipeline pode precisar de atenção.

  • Crescimento numFilesOutstanding : O backlog está aumentando. Seu pipeline está ficando para trás em relação aos dados de entrada.
  • processedRowsPerSecond ** < inputRowsPerSecond **: O pipeline está processando os dados mais lentamente do que eles chegam.
  • durationMs.latestOffsetgrande : A descoberta de arquivos é lenta. Considere mudar para eventos de arquivo.
  • Processamento de dados grande durationMs.addBatch : O processamento de dados é lento. Considere escalar o compute ou otimizar as transformações.

Para a referência completa de métricas, consulte métricas de origem do Auto Loader.

Consultar o estado de nível de arquivo com cloud_files_state

A função cloud_files_state() com valor de tabela fornece informações detalhadas sobre cada arquivo descoberto pelo Auto Loader. Os seguintes campos estão disponíveis. Os campos marcados como exigindo o Databricks Runtime 16.4 e acima ou 18.2 e acima são preenchidos apenas nas condições descritas em Pré-requisitos.

campo

Tipo

Descrição

path

STRING

O caminho do arquivo

size

BIGINT

O tamanho do arquivo em bytes.

create_time

TIMESTAMP

Quando o arquivo foi criado

discovery_time

TIMESTAMP

Quando o Auto Loader descobriu o arquivo (Databricks Runtime 16.4 e acima)

processed_time

TIMESTAMP

Quando o Auto Loader processou o arquivo (Databricks Runtime 16.4 e acima)

commit_time

TIMESTAMP

Quando o arquivo foi confirmado no checkpoint (Databricks Runtime 16.4 e acima)

archive_time

TIMESTAMP

Quando o arquivo foi arquivado (requer cloudFiles.cleanSource)

archive_mode

STRING

MOVE, DELETE ou NULL (requer cloudFiles.cleanSource)

move_location

STRING

Caminho de destino quando cloudFiles.cleanSource é MOVE

ingestion_state

STRING

Estado atual da ingestão de arquivos

Investigar o estado de ingestão do arquivo

As seguintes consultas abrangem cenários de diagnóstico comuns.

Encontrar todos os arquivos não processados (as pendências atuais):

SQL
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';

Latência média de ingestão do compute (tempo desde a criação do arquivo até o commit):

SQL
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;

Localizar arquivos corrompidos ou ignorados:

SQL
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';

Acompanhar progresso de arquivamento (exige cloudFiles.cleanSource):

SQL
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;

Encontre arquivos com alta latência da descoberta ao commit para identificar gargalos:

SQL
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;

Para a referência SQL completa, consulte cloud_files_state função com valor de tabela.

Monitorar o Auto Loader em Pipelines Declarativos do Lakeflow Spark

A Databricks recomenda o uso de Lakeflow Spark Declarative Pipelines para pipelines do Auto Loader de produção. Para aproveitar as capacidades de monitoramento integrado:

  • Armazene o log de eventos do Lakeflow Spark Declarative Pipelines em uma tabela Delta para que possa ser consultado em busca de dados de observabilidade. Configure isso nas configurações avançadas do pipeline ou na API. Para obter detalhes, consulte Log de eventos do pipeline.

  • Estruturar seu pipeline para observabilidade. Um pipeline do Auto Loader bem estruturado nos Lakeflow Spark Declarative Pipelines inclui uma view {table}_source (a definição da fonte do Auto Loader), uma tabela de transmissão {table}_bronze (ingestão de dados brutos com colunas _rescued_data e _corrupt_record), um corrupt_records_sink que coloca em quarentena linhas com dados não analisáveis e uma view limpa {table} para consumo downstream.

  • Defina expectativas em suas tabelas de transmissão bronze para monitorar drift de esquema e corrupção de dados. _rescued_data IS NULL detecta alterações inesperadas de esquema e _corrupt_record IS NULL detecta dados não analisáveis. Lakeflow Spark Declarative Pipelines avalia estas expectativas à medida que os dados chegam e gera um rastro de observabilidade. Você pode configurar expectativas para avisar, descartar linhas ou fazer o pipeline falhar.

Após criar a event_log_raw view para seu pipeline, use as seguintes consultas para métricas específicas do Auto Loader.

Monitore a taxa de transferência de ingestão por fluxo:

SQL
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;

Monitore o backlog de dados por fluxo:

SQL
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;

Resuma as violações de expectativa para detectar desvio de esquema e dados corrompidos:

SQL
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;

Para diretrizes gerais de monitoramento de LakeFlow Spark Declarative Pipelines, consulte Monitorar pipelines e Log de eventos do pipeline.

Monitorar o Auto Loader com transmissão estructurada

Ao executar o Auto Loader fora dos Pipelines Declarativos do LakeFlow Spark, utilize as seguintes abordagens de monitoramento de transmissão estructurada.

  • Implemente um StreamingQueryListener para capturar métricas específicas do Auto Loader de cada lote, lendo de source.metrics.
Python
from pyspark.sql.streaming import StreamingQueryListener

class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass

def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)

def onQueryIdle(self, event):
pass

def onQueryTerminated(self, event):
pass

spark.streams.addListener(AutoLoaderMonitor())
nota

A lógica de processamento em listeners pode retardar o processamento de consultas. Limite a computação em retornos de chamada do listener e evite gravações externas síncronas; em vez disso, emita telemetria leve assincronamente ou passe as métricas para um Job separado para persistência.

  • Use numInputRows, inputRowsPerSecond e processedRowsPerSecond do progresso da origem para compute a taxa de transferência — arquivos por segundo e linhas por segundo para cada lote.

  • Para calcular a latência de ingestão, compare create_time e commit_time de cloud_files_state() para latência de ponta a ponta. Para latência de processamento, use o detalhamento durationMs (por exemplo, latestOffset, addBatch e outras fases de lotes relatadas) para identificar qual estágio é o gargalo.

  • Use df.observe() para definir métricas de qualidade de dados em linha diretamente no DataFrame de transmissão. As métricas são visíveis nos eventos de progresso StreamingQueryListener em observedMetrics.

Python
from pyspark.sql.functions import count, lit, col

observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
  • Use .queryName() para atribuir um nome exclusivo a cada transmissão, facilitando a distinção das transmissões do Auto Loader na guia Streaming da Spark UI e nos painéis de monitoramento.

Para a referência completa de monitoramento de transmissão estructurada, consulte Monitorando consultas de transmissão estructurada no Databricks.

Criar um painel de observabilidade

Combine dados de múltiplas fontes para criar um painel abrangente de observabilidade para seus pipelines do Auto Loader. Esta tabela exibe algumas fontes sugeridas que podem ser utilizadas para estruturar seu painel de observabilidade.

Origem de dados

Dados de observabilidade

cloud_files_state()

Estado de ingestão em nível de arquivo: descoberta, processamento, commit e carimbos de data/hora de arquivamento por arquivo

log de eventos dos Lakeflow Spark Declarative Pipelines

História de execução do pipeline, métricas de fluxo por lotes e resultados de expectativa de qualidade de dados

Tabelas de saída do Pipeline

Contagens de linhas e volume de dados gravados por tabela ingerida

Você pode então agregar dados de observabilidade em tabelas dedicadas que servem como base para painéis e alertas:

  • Resumir status de execução de pipeline (sucesso ou falha) ao longo do tempo, derivado de event_type = 'update_progress' eventos.
  • Métricas agregadas de ingestão de arquivos (tamanho do backlog, taxa de transferência, latência por lote), derivadas dos eventos cloud_files_state() e event_type = 'flow_progress'.
  • Desenvolva estatísticas de tabela usando contagens de linhas e volume de dados por tabela, derivadas de num_output_rows no log de eventos.
  • Coletar informações de depuração de logs de erro detalhados e violações de expectativa por atualização, derivadas de event_type = 'flow_progress' eventos com data_quality preenchido.

Estas tabelas agregadas podem alimentar um AI/BI dashboard e alertas SQL. Os painéis de dashboard recomendados incluem linha do tempo de status de execução de pipeline, tendência de backlog de ingestão, tendência de taxa de transferência, distribuição de latência de ingestão, métricas de qualidade de dados, eventos de evolução do esquema e status de arquivamento de arquivos.

Monitorar eventos de evolução do esquema

Use as seguintes abordagens para detectar as alterações de esquema à medida que ocorrem.

  • Valores não NULOS em _rescued_data nas contagens de violação de expectativa indicam desvio de esquema. Consulte o log de eventos para failed_records > 0 na expectativa no rescued data.
  • Alterações no diretório _schemas dentro do cloudFiles.schemaLocation configurado (ou dentro do checkpoint somente quando o local do esquema não é definido separadamente) indicam que ocorreu uma evolução do esquema. Você pode sondar este diretório a partir de um Job de monitoramento separado.
  • Não trate um evento onQueryTerminated seguido por onQueryStarted para o mesmo nome de transmissão como evidência suficiente de evolução do esquema por si só. As transmissões reiniciam por diversas razões (reinícios de clusters, implantações de código, erros de armazenamento transitórios). Correlacione reinícios com sinais independentes — alterações de diretório _schemas ou violações de expectativa _rescued_data — antes de concluir que a evolução do esquema ocorreu.
  • Use _metadata.file_path para identificar quais arquivos introduziram alterações de esquema. Join isso com cloud_files_state() no campo path para correlacionar alterações de esquema com arquivos e lotes específicos.

Use esta consulta de exemplo para detectar o desvio de esquema recente por meio de violações de expectativa:

SQL
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;

Configurar alertas para problemas comuns

Use alertas do Databricks SQL ou notificações de pipeline para detectar problemas antes que afetem os consumidores a jusante.

O seguinte SQL detecta um backlog crescente e pode ser usado como base para um alerta do Databricks SQL. Programe-o para ser executado periodicamente (por exemplo, a cada 5 minutos) e alerte quando o resultado não for vazio.

SQL
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB

A tabela a seguir resume as condições de alerta recomendadas.

O que detectar

Como detectá-lo

Quando alertar

Backlog crescente

numFilesOutstanding tendência de alta

Aumento sustentado ao longo de vários lotes

Transmissão travada

Nenhum evento de progresso

Nenhum evento por N minutos (com base no intervalo de gatilho esperado)

Alta latência de ingestão

commit_time - create_time

Excede o seu limite de SLA

Degradação da qualidade dos dados

Taxa de falha de expectativa

Porcentagem crescente de linhas com falha nas expectativas

Evento de evolução do esquema

_rescued_data IS NOT NULL

Quaisquer valores não-NULOS na contagem de violação de expectativa

Descoberta lenta de arquivos

durationMs.latestOffset

Significativamente maior que a linha de base

Solucionar problemas comuns

A tabela a seguir descreve problemas comuns de pipeline do Auto Loader, suas prováveis causas e ações recomendadas para resolvê-los.

Problema

Possível causa

Ação recomendada

Backlog crescendo mais rápido que o processamento

Compute subdimensionado, distorção de dados ou limites de taxa limitados

Dimensione o compute, verifique o desvio com a Spark UI e revise as configurações de maxFilesPerTrigger para controlar o tamanho do lote.

Arquivos não sendo descobertos

Eventos de arquivo configurados incorretamente, problema de permissões ou transmissão não executada em 7 dias

Verifique as permissões de localização externa, verifique a configuração de eventos de arquivo na interface do usuário do Unity Catalog e garanta que a transmissão seja executada pelo menos a cada 7 dias para evitar a expiração do estado do RocksDB.

Startup da transmissão demorando muito

Download de estado de checkpoint grande (RocksDB)

Atualize para o Databricks Runtime 15.3 e acima para carregamento de estado assíncrono, o que reduz o tempo de startup em ~90%.

Processamento de arquivos duplicados

Configurações cloudFiles.maxFileAge agressivas ou corrupção de checkpoint

Use um maxFileAge conservador (mínimo de 90 dias), verifique a integridade do ponto de verificação e evite políticas de ciclo de vida no armazenamento do ponto de verificação.

Evolução do esquema causando reinícios do pipeline.

Alterações frequentes ou incompatíveis de esquema

Revise schemaEvolutionMode, mude para addNewColumnsWithTypeWidening para promoções de tipo ou use o tipo Variant para esquemas altamente dinâmicos.

Dados corrompidos acumulando no coletor

Problemas de qualidade de dados de origem

Verifique o _corrupt_record sink de quarentena para padrões, analise a geração de dados de origem e considere adicionar validação upstream

discovery_time e commit_time não preenchido

Em execução no Databricks Runtime abaixo de 18.2 sem cleanSource

Atualize para o Databricks Runtime 18.2 e acima ou habilite cloudFiles.cleanSource no Databricks Runtime 16.4–18.1.

Para solucionar problemas adicionais, consulte Perguntas frequentes sobre o Auto Loader.