Monitore o progresso do gateway de ingestão com logsde eventos.
Aplica-se a : Conectores SaaS
Conectores de banco de dados
Aprenda a usar logs de eventos para monitorar o progresso dos gateways de ingestão no Tempo Real. logs de eventos fornecem informações por tabela para as fases de snapshot e captura de dados de alterações (CDC), permitindo rastrear a integridade pipeline , identificar pipelines paralisados e criar soluções de monitoramento automatizadas.
Os eventos de progresso permitem que você:
- Acompanhe quantas linhas e bytes foram ingeridos por tabela sem esperar pela conclusão do pipeline.
- Monitore o progresso do Snapshot para cada tabela a fim de estimar a conclusão de grandes cargas iniciais.
- Estimar quando um Snapshot de longa duração será concluído usando o ETA por tabela.
- Meça a latência de descoberta de CDC de ponta a ponta (commit de origem para emissão de log de eventos) por tabela.
- Monitore cada tabela de dados inseridos individualmente para identificar gargalos ou problemas.
- Receba eventos mesmo quando não houver alterações nos dados para confirmar que o pipeline está em execução.
- Crie alertas e painéis usando dados de eventos estruturados em vez de analisar logs.
Como funcionam os eventos de progresso
O gateway emite os seguintes tipos de eventos em intervalos regulares (default: 5 minutos) para cada tabela em seu pipeline:
flow_progresseventos informam contadores de linha e byte para fluxos de Snapshot e CDC. As métricas nesses eventos são deltas. Eles são zerados após cada emissão. Para fluxos de CDC, esses eventos também incluem métricas de latência que medem a latência de descoberta de ponta a ponta e o desempenho do pipeline de upload.operation_progresseventos relatam o progresso do Snapshot como uma porcentagem. Fluxos de Snapshot emitem esses eventos além deflow_progress. A porcentagem de progresso é cumulativa. Acumula-se de0a100ao longo do tempo de vida do Snapshot. Esses eventos também incluem o tempo estimado restante (estimated_completion_ms) até a conclusão do snapshot.
Cada evento inclui:
- Nomes das tabelas de origem e destino.
- Métricas por tabela: linhas inseridas e atualizadas, linhas excluídas (somente CDC ), bytes de saída e porcentagem de progresso (para Snapshot).
- Para fluxos de CDC, métricas de latência incluindo latência de descoberta e tempo de processamento de lotes.
- Para snapshots, o tempo estimado restante até a conclusão.
- Quando o evento foi gerado.
Os eventos estão disponíveis na tabela log de eventos, mas não por meio APIs públicas. Você pode consultar a tabela log de eventos usando SQL para analisar o comportamento pipeline e criar soluções de monitoramento.
Acesse os eventos de progresso
Os eventos de progresso são armazenados na tabela log de eventos. Para acessá-los:
- Navegue até o seu gateway no workspace Databricks .
- Clique na tab " logeventos" para view os eventos na interface do usuário.
- Consulte a tabela log de eventos diretamente usando SQL para uma análise detalhada.
Consultar a tabela log de eventos
Para consultar eventos flow_progress para contadores de linhas e bytes:
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
Para consultar os eventos operation_progress para obter a porcentagem de progresso do Snapshot:
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
Substitua <pipeline-id> pelo seu ID de gateway.
Compreenda a estrutura do evento.
Os eventos de progresso usam um dos seguintes tipos de evento com o nível log METRICS :
flow_progressEmitido tanto para fluxos de Snapshot quanto para fluxos CDC . Relata as diferenças por linha e por byte em cada tabela.operation_progress: Emitido apenas para fluxos de Snapshot. Exibe a porcentagem de conclusão do instantâneo para uma tabela.
Os exemplos a seguir mostram a estrutura JSON para cada tipo de evento:
Estrutura de eventos de fluxo Snapshot
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}
Estrutura de eventos de fluxo do CDC
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
},
"streaming_metrics": {
"discovery_latency_ms": 12450,
"batch_processing_time_ms": 8100,
"event_time": {
"max": "2025-10-14T13:33:45.000Z"
}
}
}
},
"maturity_level": "STABLE"
}
Estrutura de eventos de progresso de operações Snapshot
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"estimated_completion_ms": 1885000,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}
Campos de evento
A tabela a seguir descreve os key campos nos eventos em andamento:
campo | Tipo | Descrição |
|---|---|---|
| String | Pode ser |
| String | Sempre |
| String | Carimbo de data/hora ISO 8601 que indica quando o evento foi gerado. |
| String | Sempre |
| String | Nome do portal. |
| String | Nome da tabela que está sendo incluída. |
| String | Nome do Unity Catalog . |
| String | Nome do esquema Unity Catalog . |
| String | Identificador de fluxo que indica a fase de ingestão. Formato: |
| String | Tipo de banco de dados de origem (por exemplo, |
| String | Estado atual do fluxo, normalmente |
| Integer | Número de linhas inseridas ou atualizadas desde o último evento. Delta meso. Reiniciar para zero após cada emissão. |
| Integer | Número de linhas excluídas desde o último evento. Delta meso. Reiniciar para zero após cada emissão. |
| Integer | Número de bytes compactados enviados para um volume desde o último evento. Delta meso. Reiniciar para zero após cada emissão. Preenchido para os fluxos Snapshot e CDC . |
| Integer | Tempo em milissegundos entre a alteração da fonte em |
| Integer | Tempo em milissegundos que o gateway gastou lendo e fazendo upload do lote de alterações mais recente. Não inclui atraso da origem. Apenas fluxos de CDC. Pode ser |
| String | Timestamp ISO 8601 da alteração de origem mais recente que o gateway leu para esta tabela. Somente fluxos do CDC. |
| String | tipo de operações. |
| String | Situação atual. |
| Integer | Tempo total decorrido das operações em milissegundos. |
| Double | Percentagem de conclusão do Snapshot ( |
| Integer | Estimativa de tempo restante em milissegundos até que o Snapshot termine. Diminui conforme o Snapshot progride e atinge |
| String | Nome completo da tabela que está sendo capturada. |
| String | Sempre |
comportamento de lodo
As métricas de progresso se enquadram nas seguintes categorias:
Delta métricas (num_upserted_rows, num_deleted_rows, num_output_bytes):
- Representam as mudanças ocorridas desde o último evento, e não os totais acumulados.
- Reset para zero após cada emissão de evento.
- São emitidos mesmo quando não ocorrem alterações nos dados, servindo como indicadores de atividade.
- Para fluxos de Snapshot,
num_deleted_rowsénullporque o Snapshot não produz exclusões.
Métricas cumulativas (progress_percent):
- O valor acumula de
0.0para100.0ao longo da vida útil de um Snapshot. - Atualizações conforme o Snapshot progride. Para tabelas pequenas, o valor pode saltar diretamente de
0.0para100.0. Para tabelas grandes, o valor é atualizado gradualmente e é uma aproximação, não uma contagem exata de linhas.
Métricas pontuais discovery_latency_ms``batch_processing_time_ms``event_time.max``estimated_completion_ms(,,,):
- Cada valor reflete o estado da métrica no momento em que o evento foi emitido.
discovery_latency_msebatch_processing_time_msaplicam-se apenas aos fluxos do CDC. Os valores são relatados como0se o resultado for negativo de outra forma.event_time.maxAplica-se somente a fluxos de CDC. O valor é o timestamp da alteração de origem mais recente que o gateway leu.estimated_completion_msAplica-se apenas a fluxos de Snapshot. O valor diminui à medida que o Snapshot progride e atinge0na conclusão.
Configurar eventos de progresso
Os eventos de progresso estão ativados por default para novos gateways. Você pode personalizar o comportamento dos eventos usando parâmetros de configuração do pipeline.
Ativar ou desativar eventos de progresso
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}
Defina como "false" para desativar os eventos de progresso.
Ajustar a frequência de emissão do evento
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}
Padrão: 300 segundos (cinco minutos). Intervalo válido: 30 a 3600 segundos (30 segundos a uma hora). Esta configuração controla a cadência dos eventos flow_progress e operation_progress .
Exemplo de configuração de gateway
O exemplo a seguir mostra uma configuração completa de gateway com eventos de progresso ativados e configurados para serem emitidos a cada cinco minutos:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}
Comportamentos e limitações importantes
comportamento padrão
- O recurso está habilitado por default para todos os novos gateways.
- Os pipelines existentes recebem automaticamente esse recurso na próxima atualização ou reinicialização.
- Nenhuma ação é necessária para ativar os eventos de progresso.
Disponibilidade de métricas entre versões do gateway
Snapshot ETA (estimated_completion_ms) e métricas de latência do CDC (streaming_metrics) exigem uma imagem de gateway da versão do gateway de ingestão de maio de 2026 ou posterior. O Databricks seleciona a imagem do gateway automaticamente. Não é possível configurar isso manualmente. Os recursos atingiram todas as regiões de produção em maio de 2026.
Tanto pipelines recém-criados quanto existentes recebem a nova imagem. Os pipelines existentes o adotam automaticamente na sua próxima atualização ou reinício, de modo que não é preciso migrar.
Para verificar se o seu pipeline emite os campos estimated_completion_ms e streaming_metrics, execute a seguinte consulta. Se ambas as colunas retornarem linhas, seu gateway oferece suporte a métricas de latência de Snapshot ETA e CDC:
SELECT
MAX(CASE WHEN details:operation_progress:estimated_completion_ms IS NOT NULL
THEN timestamp END) AS last_snapshot_eta,
MAX(CASE WHEN details:flow_progress:streaming_metrics IS NOT NULL
THEN timestamp END) AS last_cdc_latency
FROM event_log('<pipeline-id>')
WHERE event_type IN ('operation_progress', 'flow_progress')
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
Se nenhuma das colunas tiver um registro de data e hora recente depois de um intervalo de emissão completo (default: cinco minutos) após uma reinicialização do pipeline, entre em contato com o suporte do Databricks para confirmar se os recursos estão habilitados em sua região.
Considerações sobre o tempo
- A primeira emissão pode levar até o intervalo de frequência configurado (default: cinco minutos) após o início pipeline para que os eventos de progresso apareçam.
- Os eventos são emitidos na frequência configurada durante a ingestão ativa.
Metodologia de atualização zero
-
Os eventos são emitidos para todas as tabelas, incluindo aquelas com zero atualizações.
-
As métricas de atualização zero ajudam a distinguir entre:
- Tabelas Parado: Processadas, mas nenhuma alteração nos dados ocorreu.
- Tabelas não processadas: Ainda não foram selecionadas pelo pipeline.
-
Eventos de atualização zero servem como sinais de atividade, confirmando que o pipeline está em execução.
Comportamento percentual do progressoSnapshot
- O progresso Snapshot é calculado como
(completed_chunks / total_chunks) × 100. Os valores apresentados são aproximados, não uma porcentagem exata por linha. - Tabelas que não são divididas em vários blocos (normalmente tabelas menores) saltam de
0.0diretamente para100.0entre emissões porque há apenas um bloco para rastrear. - Tabelas grandes divididas em vários blocos são atualizadas incrementalmente à medida que cada bloco é concluído, fornecendo um sinal de progresso gradual que é útil para monitorar cargas iniciais de longa duração.
- Um status
COMPLETEDsempre reportaprogress_percent = 100.0. - A medida não persiste após uma refresh ou reinicialização pipeline . Após uma reinicialização, o progresso do Snapshot é retomado a partir do último ponto de verificação confirmado e as métricas continuam a subir a partir da posição retomada.
Exemplos de consultas
As seguintes consultas de exemplo mostram como monitorar seu gateway usando contagens de linhas, bytes de saída, progresso e ETA de Snapshot e métricas de latência de CDC. Substitua <pipeline-id> pelo seu ID do gateway em cada consulta.
O Notebook Monitor de Progresso do Gateway de Ingestão contém todas as queries nesta seção. Importe o Notebook no workspace onde seu gateway executa, e especifique o ID do seu gateway. Execute o Notebook para inspecionar o progresso do Snapshot, a latência do CDC e a contagem de linhas. Você também pode ajustar os limites de SLA para atender aos seus requisitos de monitoramento.
consultas de contagem de linhas
Volume por mesa (últimas 24 horas)
Total de operações de upsert e delete para cada tabela nas últimas 24 horas, com a fase de ingestão classificada pelo sufixo do nome do fluxo. Use isso como título de um painel para ver quais tabelas movimentaram mais dados.
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC
Eventos recentes de progresso (última hora)
Contadores de linhas recentes para todas as tabelas em seu pipeline. Útil para monitoramento quase em tempo real.
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
Identifique mesas silenciosas ou travadas
Tabelas que emitem eventos, mas que reportam zero inserções e zero exclusões nos últimos 60 minutos, são candidatas ao status "travadas". Investigue mais a fundo se a fonte deve ser alterada. Tabelas CDC que são genuinamente parado (por exemplo, durante a noite) também aparecem aqui.
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC
Cronograma por mesa com totais acumulados
Histórico completo, evento por evento, de um único fluxo nas últimas 24 horas, com contagem cumulativa de linhas. Substitua <flow-pattern> por um padrão SQL LIKE (por exemplo, '%customers%_cdc_flow').
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp
Consultas de bytes de saída
Bytes por tabela (últimas 24 horas)
Total de bytes enviados para um volume por tabela nas últimas 24 horas, com unidades de fácil compreensão em MB e GB. Ordene em ordem decrescente para visualizar as tabelas com maior volume de dados.
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC
Tendência da taxa de transferência (MB por minuto)
Série temporal por minuto do volume de bytes enviados em todos os fluxos nas últimas 24 horas. Renderize como um gráfico de linhas para detectar padrões e paralisações da Taxa de transferência.
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute
Média de bytes por linha (detector de tabela ampla ou LOB)
junte num_output_bytes com contagens de linhas para compute a média de bytes por linha por tabela. Valores altos geralmente indicam tabelas LOB ou de esquema amplo que impulsionam o custo de transferência. Útil para planejamento de capacidade e revisão de esquemas.
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC
Consultas de progressoSnapshot
Progresso geral do Snapshot
A consulta a seguir retorna um resumo de linha única de quantas tabelas estão concluídas, em andamento ou em fila de espera. Os resultados refletem o estado mais recente reportado por tabela em todas as atualizações do pipeline na janela de retenção do log de eventos, portanto, as tabelas concluídas em uma atualização anterior continuam a contar para tables_completed após um refresh ou reinício.
WITH latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1
Quadro de status de instantâneos por mesa
A consulta a seguir retorna cada tabela no pipeline com seu status mais recente relatado e porcentagem de progresso. Os resultados incluem tabelas concluídas em atualizações anteriores, portanto, nenhuma tabela aparece como linhas ausentes.
WITH table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC
Snapshot de linhas e bytes carregados na atualização atual.
A consulta a seguir combina porcentagem de progresso, linhas upsertadas e bytes uploads para cada tabela na execução do Snapshot atual.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC
Detecção de instantâneo travada
A seguinte consulta retorna tabelas snapshot cujo progress_percent não foi alterado nos últimos 30 minutos. Utilizado para identificar Snapshots que foram paralisados, mas ainda estão ativos.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC
Snapshot ETA por tabela
A seguinte consulta retorna a porcentagem de progresso atual e o tempo estimado para conclusão para cada tabela na execução do Snapshot ativa. Somente tabelas com status de IN_PROGRESS são retornadas. Para incluir tabelas concluídas e em fila, remova o filtro status = 'IN_PROGRESS'.
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_flow AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:estimated_completion_ms::bigint AS eta_ms,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
eta_ms,
ROUND(eta_ms / 1000.0 / 60.0, 1) AS eta_minutes,
ROUND(eta_ms / 1000.0 / 3600.0, 2) AS eta_hours,
timestamp AS last_update
FROM latest_per_flow
WHERE rn = 1
AND status = 'IN_PROGRESS'
ORDER BY eta_ms DESC NULLS LAST
consultas de latência do CDC
Atualização do CDC por tabela
Para cada fluxo de CDC, o gateway reporta múltiplos campos de observabilidade dentro de streaming_metrics. Permitem saber se uma tabela está atualizada, quais tabelas estão atrasadas e se o atraso é do lado da origem ou do lado do gateway.
Estas são as latências do gateway de ingestão. Eles medem o caminho do banco de dados de origem através do gateway até o volume do Unity Catalog. Eles não incluem a latência do aplicador downstream do volume do Unity Catalog para a tabela de destino. Essa etapa é observada separadamente no log de eventos do aplicador.
campo | Descrição |
|---|---|
| Timestamp ISO 8601 da alteração mais recente que o gateway leu do banco de dados de origem para esta tabela. Se este valor parar de mudar entre eventos, a origem parou de produzir alterações ou o gateway não consegue mais ler da origem. |
| Tempo em milissegundos entre a alteração na origem em |
| Tempo em milissegundos que o gateway gastou lendo e fazendo upload do lote de alterações mais recente. Não inclui atrasos por parte do banco de dados de origem. Para estimar o atraso do banco de dados de origem, subtraia este valor de |
A relação entre discovery_latency_ms e batch_processing_time_ms indica onde no caminho do gateway (do banco de dados de origem para o volume do Unity Catalog) a latência está concentrada:
Padrão | O que significa |
|---|---|
Ambos pequenos | CDC está em execução e atualizado. |
| Atraso do lado da fonte. O gateway está lendo prontamente. Commits chegaram atrasados da origem (atraso de replicação, backlog de log da origem, transações de origem de longa duração). |
Ambos altos | Atraso no Gateway. O pipeline de upload é o gargalo. Verifique os recursos de compute do gateway e o caminho de rede para o volume do Unity Catalog. |
| Problema na fonte por tabela (DDL em andamento, contenção de bloqueio, slot de replicação bloqueado, alteração de esquema). |
| Problema em todo o Gateway (recursos, conectividade de volume, atraso do log CDC de origem afetando todas as capturas). |
| A origem está parado, ou o gateway perdeu a conectividade com o banco de dados de origem. Verifique a habilitação do CDC de origem e os logs do gateway. |
A seguinte consulta retorna os últimos 30 minutos de eventos de atualização do CDC para cada tabela do CDC. Com o intervalo default de cinco minutos, isso produz aproximadamente seis linhas por tabela. Os resultados são ordenados por discovery_latency_ms para que as tabelas com mais atraso apareçam primeiro.
SELECT
origin.flow_name AS flow_name,
details:flow_progress:streaming_metrics:event_time:max::string AS latest_source_commit_seen,
details:flow_progress:streaming_metrics:discovery_latency_ms::bigint AS discovery_latency_ms,
details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint AS batch_processing_time_ms,
timestamp AS emission_ts
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
ORDER BY discovery_latency_ms DESC NULLS LAST, flow_name, emission_ts DESC
Série temporal de latência do CDC
A consulta a seguir retorna a média horária de discovery_latency_ms e batch_processing_time_ms para cada fluxo de CDC nas últimas 24 horas. Utilize-o para identificar quando o frescor se degradou. Para pipelines com muitos fluxos de CDC, filtre por nome do fluxo para limitar o conjunto de resultados.
SELECT
DATE_TRUNC('HOUR', timestamp) AS ts_hour,
origin.flow_name AS flow_name,
ROUND(AVG(details:flow_progress:streaming_metrics:discovery_latency_ms::bigint) / 1000.0, 2) AS avg_discovery_latency_sec,
ROUND(AVG(details:flow_progress:streaming_metrics:batch_processing_time_ms::bigint) / 1000.0, 2) AS avg_batch_processing_sec
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name IS NOT NULL
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND origin.flow_name LIKE '%_cdc_flow'
AND details:flow_progress:streaming_metrics IS NOT NULL
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('HOUR', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_hour
Solução de problemas
Nenhum evento de progresso é exibido.
Se você não visualizar eventos de progresso no log de eventos:
- Verifique se
pipelines.gateway.progressEventsEnabledestá definido como"true". - Aguarde pelo menos um intervalo completo após o início pipeline . O valor padrão é de cinco minutos.
- Verifique se o pipeline está em execução e recebendo dados.
- Inclua o filtro
level = 'METRICS'para ver apenas os eventos de progresso.
Os eventos ocorrem com muita frequência ou pouca frequência.
Se os eventos não ocorrerem com a frequência esperada:
Verifique a configuração pipelines.gateway.progressEventEmitFrequencySeconds e ajuste conforme necessário:
- O valor padrão é de cinco minutos (300 segundos).
- Intervalo válido: 30 a 3600 segundos. Ajuste conforme necessário.
As métricas mostram zero após a reinicialização do pipeline.
Se os valores forem zerados após a reinicialização pipeline :
Os números ficam armazenados apenas na memória e são redefinidos ao reiniciar, refresh ou retomar a operação. Isso é intencional para simplificar a implementação. O pipeline começará a acumular novas informações imediatamente.
Faltam métricas para algumas tabelas
Se algumas tabelas não mostrarem eventos de progresso:
- Certifique-se de que a tabela não esteja filtrada na configuração do pipeline.
- Para a fase CDC , certifique-se de que a tabela de origem tenha CDC ou o acompanhamento de alterações ativado.
- Confirme se a tabela está incluída na configuração do gateway.
- Observe que
progress_percentsó é emitido em eventosoperation_progresspara fluxos de Snapshot. Os fluxos do CDC não emitem eventosoperation_progressporque o CDC não tem conceito de conclusão.
Campos estimated_completion_ms ou streaming_metrics ausentes
Se event_log linhas existirem, mas o objeto estimated_completion_ms ou streaming_metrics estiver ausente, consulte Disponibilidade de métricas entre as versões do gateway para a consulta de diagnóstico e os requisitos mínimos do gateway.