Monitorar e observar o Auto Loader
Os pipelines do Auto Loader exigem monitoramento ativo para detectar problemas como backlogs crescentes, desvio de esquema, dados corrompidos e transmissões paralisadas antes que afetem os consumidores downstream. Esta página descreve como monitorar key métricas, consultar o estado no nível do arquivo, criar painéis de observabilidade e solucionar problemas comuns.
Para obter detalhes da configuração de produção, consulte Configurar o Auto Loader para cargas de trabalho de produção.
Pré-requisitos
Vários fluxos de trabalho de monitoramento nesta página dependem de cloud_files_state() para observar o estado de ingestão por arquivo — incluindo consultas de backlog, cálculos de latência e detecção de desvio de esquema. cloud_files_state() é uma função com valor de tabela que retorna o estado de ingestão em nível de arquivo para um checkpoint do Auto Loader. Nem todos os seus campos estão disponíveis por default. A disponibilidade depende da sua versão e configuração do Databricks Runtime:
- Databricks Runtime 18.2 e acima :
discovery_time,processed_timeecommit_timeestão disponíveis automaticamente. No Databricks Runtime 16.4–18.1, esses campos estão disponíveis somente quandocloudFiles.cleanSourceestiver ativado. - **Databricks Runtime 16.4 e acima com
cloudFiles.cleanSourcearchive_timehabilitado**:,archive_modeemove_locationestão disponíveis.
A habilitação de cloudFiles.cleanSource tem alguma sobrecarga de desempenho. Compare com suas cargas de trabalho em um ambiente de pré-produção antes de habilitá-lo em produção.
Além disso:
- Anote os dados ingeridos com a coluna
_metadata. Capture no mínimofile_pathefile_modification_time. Consulte Coluna de metadados de arquivo. - Habilitar as colunas
_rescued_datae_corrupt_record.
Métricas key do Auto Loader
A tabela a seguir resume as métricas mais importantes a monitorar para pipelines do Auto Loader. Estas métricas estão disponíveis a partir de eventos de progresso StreamingQueryListener, com valores específicos do Auto Loader expostos no mapa metrics de cada fonte.
Métrica | O que informa |
|---|---|
| Número de arquivos na lista de pendências aguardando para serem processados |
| Tamanho do backlog de arquivos em bytes |
| Profundidade da fila na cloud (somente modo de notificação de arquivo) |
| Linhas processadas por lote |
| Taxa de chegada de dados |
| Taxa de transferência de processamento |
| Onde o tempo é gasto em cada lote |
O que observar
Os seguintes padrões indicam que seu pipeline pode precisar de atenção.
- Crescimento
numFilesOutstanding: O backlog está aumentando. Seu pipeline está ficando para trás em relação aos dados de entrada. processedRowsPerSecond** <inputRowsPerSecond**: O pipeline está processando os dados mais lentamente do que eles chegam.durationMs.latestOffsetgrande : A descoberta de arquivos é lenta. Considere mudar para eventos de arquivo.- Processamento de dados grande
durationMs.addBatch: O processamento de dados é lento. Considere escalar o compute ou otimizar as transformações.
Para a referência completa de métricas, consulte métricas de origem do Auto Loader.
Consultar o estado de nível de arquivo com cloud_files_state
A função cloud_files_state() com valor de tabela fornece informações detalhadas sobre cada arquivo descoberto pelo Auto Loader. Os seguintes campos estão disponíveis. Os campos marcados como exigindo o Databricks Runtime 16.4 e acima ou 18.2 e acima são preenchidos apenas nas condições descritas em Pré-requisitos.
campo | Tipo | Descrição |
|---|---|---|
|
| O caminho do arquivo |
|
| O tamanho do arquivo em bytes. |
|
| Quando o arquivo foi criado |
|
| Quando o Auto Loader descobriu o arquivo (Databricks Runtime 16.4 e acima) |
|
| Quando o Auto Loader processou o arquivo (Databricks Runtime 16.4 e acima) |
|
| Quando o arquivo foi confirmado no checkpoint (Databricks Runtime 16.4 e acima) |
|
| Quando o arquivo foi arquivado (requer |
|
|
|
|
| Caminho de destino quando |
|
| Estado atual da ingestão de arquivos |
Investigar o estado de ingestão do arquivo
As seguintes consultas abrangem cenários de diagnóstico comuns.
Encontrar todos os arquivos não processados (as pendências atuais):
SELECT * FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state != 'COMMITTED';
Latência média de ingestão do compute (tempo desde a criação do arquivo até o commit):
SELECT avg(unix_timestamp(commit_time) - unix_timestamp(create_time)) AS avg_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL AND create_time IS NOT NULL;
Localizar arquivos corrompidos ou ignorados:
SELECT path, ingestion_state, size, create_time
FROM cloud_files_state('path/to/checkpoint')
WHERE ingestion_state LIKE 'SKIPPED%';
Acompanhar progresso de arquivamento (exige cloudFiles.cleanSource):
SELECT archive_mode, count(*) AS file_count
FROM cloud_files_state('path/to/checkpoint')
GROUP BY archive_mode;
Encontre arquivos com alta latência da descoberta ao commit para identificar gargalos:
SELECT
path,
size,
unix_timestamp(commit_time) - unix_timestamp(discovery_time) AS processing_latency_seconds,
unix_timestamp(commit_time) - unix_timestamp(create_time) AS end_to_end_latency_seconds
FROM cloud_files_state('path/to/checkpoint')
WHERE commit_time IS NOT NULL
ORDER BY end_to_end_latency_seconds DESC
LIMIT 20;
Para a referência SQL completa, consulte cloud_files_state função com valor de tabela.
Monitorar o Auto Loader em Pipelines Declarativos do Lakeflow Spark
A Databricks recomenda o uso de Lakeflow Spark Declarative Pipelines para pipelines do Auto Loader de produção. Para aproveitar as capacidades de monitoramento integrado:
-
Armazene o log de eventos do Lakeflow Spark Declarative Pipelines em uma tabela Delta para que possa ser consultado em busca de dados de observabilidade. Configure isso nas configurações avançadas do pipeline ou na API. Para obter detalhes, consulte Log de eventos do pipeline.
-
Estruturar seu pipeline para observabilidade. Um pipeline do Auto Loader bem estruturado nos Lakeflow Spark Declarative Pipelines inclui uma view
{table}_source(a definição da fonte do Auto Loader), uma tabela de transmissão{table}_bronze(ingestão de dados brutos com colunas_rescued_datae_corrupt_record), umcorrupt_records_sinkque coloca em quarentena linhas com dados não analisáveis e uma view limpa{table}para consumo downstream. -
Defina expectativas em suas tabelas de transmissão bronze para monitorar drift de esquema e corrupção de dados.
_rescued_data IS NULLdetecta alterações inesperadas de esquema e_corrupt_record IS NULLdetecta dados não analisáveis. Lakeflow Spark Declarative Pipelines avalia estas expectativas à medida que os dados chegam e gera um rastro de observabilidade. Você pode configurar expectativas para avisar, descartar linhas ou fazer o pipeline falhar.
Após criar a event_log_raw view para seu pipeline, use as seguintes consultas para métricas específicas do Auto Loader.
Monitore a taxa de transferência de ingestão por fluxo:
SELECT
origin.flow_name,
origin.update_id,
timestamp,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS rows_written
FROM event_log_raw
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC;
Monitore o backlog de dados por fluxo:
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
ORDER BY timestamp DESC;
Resuma as violações de expectativa para detectar desvio de esquema e dados corrompidos:
SELECT
origin.flow_name,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS expectation
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL;
Para diretrizes gerais de monitoramento de LakeFlow Spark Declarative Pipelines, consulte Monitorar pipelines e Log de eventos do pipeline.
Monitorar o Auto Loader com transmissão estructurada
Ao executar o Auto Loader fora dos Pipelines Declarativos do LakeFlow Spark, utilize as seguintes abordagens de monitoramento de transmissão estructurada.
- Implemente um
StreamingQueryListenerpara capturar métricas específicas do Auto Loader de cada lote, lendo desource.metrics.
from pyspark.sql.streaming import StreamingQueryListener
class AutoLoaderMonitor(StreamingQueryListener):
def onQueryStarted(self, event):
pass
def onQueryProgress(self, event):
for source in event.progress.sources:
if "CloudFilesSource" in source.description:
metrics = source.metrics
files_outstanding = metrics.get("numFilesOutstanding", "0")
bytes_outstanding = metrics.get("numBytesOutstanding", "0")
rows_per_sec = source.processedRowsPerSecond
# Push metrics to your monitoring system (for example, write to a Delta table)
def onQueryIdle(self, event):
pass
def onQueryTerminated(self, event):
pass
spark.streams.addListener(AutoLoaderMonitor())
A lógica de processamento em listeners pode retardar o processamento de consultas. Limite a computação em retornos de chamada do listener e evite gravações externas síncronas; em vez disso, emita telemetria leve assincronamente ou passe as métricas para um Job separado para persistência.
-
Use
numInputRows,inputRowsPerSecondeprocessedRowsPerSeconddo progresso da origem para compute a taxa de transferência — arquivos por segundo e linhas por segundo para cada lote. -
Para calcular a latência de ingestão, compare
create_timeecommit_timedecloud_files_state()para latência de ponta a ponta. Para latência de processamento, use o detalhamentodurationMs(por exemplo,latestOffset,addBatche outras fases de lotes relatadas) para identificar qual estágio é o gargalo. -
Use
df.observe()para definir métricas de qualidade de dados em linha diretamente no DataFrame de transmissão. As métricas são visíveis nos eventos de progressoStreamingQueryListeneremobservedMetrics.
from pyspark.sql.functions import count, lit, col
observed_df = df.observe(
"auto_loader_quality",
count(lit(1)).alias("total_rows"),
count(col("_rescued_data")).alias("rescued_rows"),
count(col("_corrupt_record")).alias("corrupt_rows")
)
- Use
.queryName()para atribuir um nome exclusivo a cada transmissão, facilitando a distinção das transmissões do Auto Loader na guia Streaming da Spark UI e nos painéis de monitoramento.
Para a referência completa de monitoramento de transmissão estructurada, consulte Monitorando consultas de transmissão estructurada no Databricks.
Criar um painel de observabilidade
Combine dados de múltiplas fontes para criar um painel abrangente de observabilidade para seus pipelines do Auto Loader. Esta tabela exibe algumas fontes sugeridas que podem ser utilizadas para estruturar seu painel de observabilidade.
Origem de dados | Dados de observabilidade |
|---|---|
| Estado de ingestão em nível de arquivo: descoberta, processamento, commit e carimbos de data/hora de arquivamento por arquivo |
log de eventos dos Lakeflow Spark Declarative Pipelines | História de execução do pipeline, métricas de fluxo por lotes e resultados de expectativa de qualidade de dados |
Tabelas de saída do Pipeline | Contagens de linhas e volume de dados gravados por tabela ingerida |
Você pode então agregar dados de observabilidade em tabelas dedicadas que servem como base para painéis e alertas:
- Resumir status de execução de pipeline (sucesso ou falha) ao longo do tempo, derivado de
event_type = 'update_progress'eventos. - Métricas agregadas de ingestão de arquivos (tamanho do backlog, taxa de transferência, latência por lote), derivadas dos eventos
cloud_files_state()eevent_type = 'flow_progress'. - Desenvolva estatísticas de tabela usando contagens de linhas e volume de dados por tabela, derivadas de
num_output_rowsno log de eventos. - Coletar informações de depuração de logs de erro detalhados e violações de expectativa por atualização, derivadas de
event_type = 'flow_progress'eventos comdata_qualitypreenchido.
Estas tabelas agregadas podem alimentar um AI/BI dashboard e alertas SQL. Os painéis de dashboard recomendados incluem linha do tempo de status de execução de pipeline, tendência de backlog de ingestão, tendência de taxa de transferência, distribuição de latência de ingestão, métricas de qualidade de dados, eventos de evolução do esquema e status de arquivamento de arquivos.
Monitorar eventos de evolução do esquema
Use as seguintes abordagens para detectar as alterações de esquema à medida que ocorrem.
- Valores não NULOS em
_rescued_datanas contagens de violação de expectativa indicam desvio de esquema. Consulte o log de eventos parafailed_records > 0na expectativano rescued data. - Alterações no diretório
_schemasdentro docloudFiles.schemaLocationconfigurado (ou dentro do checkpoint somente quando o local do esquema não é definido separadamente) indicam que ocorreu uma evolução do esquema. Você pode sondar este diretório a partir de um Job de monitoramento separado. - Não trate um evento
onQueryTerminatedseguido poronQueryStartedpara o mesmo nome de transmissão como evidência suficiente de evolução do esquema por si só. As transmissões reiniciam por diversas razões (reinícios de clusters, implantações de código, erros de armazenamento transitórios). Correlacione reinícios com sinais independentes — alterações de diretório_schemasou violações de expectativa_rescued_data— antes de concluir que a evolução do esquema ocorreu. - Use
_metadata.file_pathpara identificar quais arquivos introduziram alterações de esquema. Join isso comcloud_files_state()no campopathpara correlacionar alterações de esquema com arquivos e lotes específicos.
Use esta consulta de exemplo para detectar o desvio de esquema recente por meio de violações de expectativa:
SELECT
timestamp,
origin.flow_name,
exp.name AS expectation_name,
exp.failed_records
FROM (
SELECT
timestamp,
origin,
explode(from_json(
details:flow_progress.data_quality.expectations,
'array<struct<name:string, dataset:string, passed_records:bigint, failed_records:bigint>>'
)) AS exp
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.data_quality.expectations IS NOT NULL
)
WHERE exp.name = '<rescued-data expectation name>'
AND exp.failed_records > 0
ORDER BY timestamp DESC;
Configurar alertas para problemas comuns
Use alertas do Databricks SQL ou notificações de pipeline para detectar problemas antes que afetem os consumidores a jusante.
O seguinte SQL detecta um backlog crescente e pode ser usado como base para um alerta do Databricks SQL. Programe-o para ser executado periodicamente (por exemplo, a cada 5 minutos) e alerte quando o resultado não for vazio.
-- Alert when backlog exceeds threshold or trends upward across recent batches
WITH recent_backlog AS (
SELECT
origin.flow_name,
timestamp,
DOUBLE(details:flow_progress.metrics.backlog_bytes) AS backlog_bytes,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND details:flow_progress.metrics.backlog_bytes IS NOT NULL
)
SELECT flow_name, backlog_bytes, timestamp
FROM recent_backlog
WHERE rn = 1
AND backlog_bytes > 1073741824 -- alert when backlog exceeds 1 GB
A tabela a seguir resume as condições de alerta recomendadas.
O que detectar | Como detectá-lo | Quando alertar |
|---|---|---|
Backlog crescente |
| Aumento sustentado ao longo de vários lotes |
Transmissão travada | Nenhum evento de progresso | Nenhum evento por N minutos (com base no intervalo de gatilho esperado) |
Alta latência de ingestão |
| Excede o seu limite de SLA |
Degradação da qualidade dos dados | Taxa de falha de expectativa | Porcentagem crescente de linhas com falha nas expectativas |
Evento de evolução do esquema |
| Quaisquer valores não-NULOS na contagem de violação de expectativa |
Descoberta lenta de arquivos |
| Significativamente maior que a linha de base |
Solucionar problemas comuns
A tabela a seguir descreve problemas comuns de pipeline do Auto Loader, suas prováveis causas e ações recomendadas para resolvê-los.
Problema | Possível causa | Ação recomendada |
|---|---|---|
Backlog crescendo mais rápido que o processamento | Compute subdimensionado, distorção de dados ou limites de taxa limitados | Dimensione o compute, verifique o desvio com a Spark UI e revise as configurações de |
Arquivos não sendo descobertos | Eventos de arquivo configurados incorretamente, problema de permissões ou transmissão não executada em 7 dias | Verifique as permissões de localização externa, verifique a configuração de eventos de arquivo na interface do usuário do Unity Catalog e garanta que a transmissão seja executada pelo menos a cada 7 dias para evitar a expiração do estado do RocksDB. |
Startup da transmissão demorando muito | Download de estado de checkpoint grande (RocksDB) | Atualize para o Databricks Runtime 15.3 e acima para carregamento de estado assíncrono, o que reduz o tempo de startup em ~90%. |
Processamento de arquivos duplicados | Configurações | Use um |
Evolução do esquema causando reinícios do pipeline. | Alterações frequentes ou incompatíveis de esquema | Revise |
Dados corrompidos acumulando no coletor | Problemas de qualidade de dados de origem | Verifique o |
| Em execução no Databricks Runtime abaixo de 18.2 sem | Atualize para o Databricks Runtime 18.2 e acima ou habilite |
Para solucionar problemas adicionais, consulte Perguntas frequentes sobre o Auto Loader.