Pular para o conteúdo principal

Monitore o progresso do gateway de ingestão com logsde eventos.

Aplica-se a : Ícone X vermelho Conectores SaaS Ícone de visto verde Conectores de banco de dados

Aprenda a usar logs de eventos para monitorar o progresso dos gateways de ingestão no Tempo Real. logs de eventos fornecem informações por tabela para as fases de snapshot e captura de dados de alterações (CDC), permitindo rastrear a integridade pipeline , identificar pipelines paralisados e criar soluções de monitoramento automatizadas.

Os eventos de progresso permitem que você:

  • Acompanhe quantas linhas e bytes foram ingeridos por tabela sem esperar pela conclusão do pipeline.
  • Monitore o progresso do Snapshot para cada tabela a fim de estimar a conclusão de grandes cargas iniciais.
  • Monitore cada tabela de dados inseridos individualmente para identificar gargalos ou problemas.
  • Receba eventos mesmo quando não houver alterações nos dados para confirmar que o pipeline está em execução.
  • Crie alertas e painéis usando dados de eventos estruturados em vez de analisar logs.

Como funcionam os eventos de progresso

O gateway emite os seguintes tipos de eventos em intervalos regulares (default: 5 minutos) para cada tabela em seu pipeline:

  • Os eventos flow_progress relatam contadores de linhas e bytes para fluxos de Snapshot e CDC . As métricas nesses eventos são deltas. Eles são zerados após cada emissão.
  • operation_progress eventos relatam o progresso do Snapshot em porcentagem. Os fluxos Snapshot emitem esses eventos além de flow_progress. A porcentagem de progresso é cumulativa. Acumula-se de 0 para 100 ao longo da vida útil do Snapshot.

Cada evento inclui:

  • Nomes das tabelas de origem e destino.
  • Métricas por tabela: linhas inseridas e atualizadas, linhas excluídas (somente CDC ), bytes de saída e porcentagem de progresso (para Snapshot).
  • Quando o evento foi gerado.

Os eventos estão disponíveis na tabela log de eventos, mas não por meio APIs públicas. Você pode consultar a tabela log de eventos usando SQL para analisar o comportamento pipeline e criar soluções de monitoramento.

Acesse os eventos de progresso

Os eventos de progresso são armazenados na tabela log de eventos. Para acessá-los:

  1. Navegue até o seu gateway no workspace Databricks .
  2. Clique na tab " logeventos" para view os eventos na interface do usuário.
  3. Consulte a tabela log de eventos diretamente usando SQL para uma análise detalhada.

Consultar a tabela log de eventos

Para consultar eventos flow_progress para contadores de linhas e bytes:

SQL
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) AS table_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS rows_upserted,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS rows_deleted,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'snapshot'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'cdc'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

Para consultar os eventos operation_progress para obter a porcentagem de progresso do Snapshot:

SQL
SELECT
timestamp,
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

Substitua <pipeline-id> pelo seu ID de gateway.

Compreenda a estrutura do evento.

Os eventos de progresso usam um dos seguintes tipos de evento com o nível log METRICS :

  • flow_progressEmitido tanto para fluxos de Snapshot quanto para fluxos CDC . Relata as diferenças por linha e por byte em cada tabela.
  • operation_progress: Emitido apenas para fluxos de Snapshot. Exibe a porcentagem de conclusão do instantâneo para uma tabela.

Os exemplos a seguir mostram a estrutura JSON para cada tipo de evento:

Estrutura de eventos de fluxo Snapshot

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": null,
"num_output_bytes": 458752000
}
}
},
"maturity_level": "STABLE"
}

Estrutura de eventos de fluxo do CDC

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3,
"num_output_bytes": 18432
}
}
},
"maturity_level": "STABLE"
}

Estrutura de eventos de progresso de operações Snapshot

JSON
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "operation_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Snapshot in progress for 'main.sales.customers'.",
"details": {
"operation_progress": {
"type": "CDC_SNAPSHOT",
"status": "IN_PROGRESS",
"duration_ms": 3600000,
"progress_percent": 65.5,
"cdc_snapshot": {
"target_table_name": "main.sales.customers",
"snapshot_timestamp": 1737542400000,
"snapshot_reason": "NEW_TABLE"
}
}
},
"maturity_level": "STABLE"
}

Campos de evento

A tabela a seguir descreve os key campos nos eventos em andamento:

campo

Tipo

Descrição

event_type

String

Pode ser flow_progress (contadores de linha e byte para Snapshot e CDC) ou operation_progress (percentagem de progresso do Snapshot).

level

String

Sempre METRICS.

timestamp

String

Carimbo de data/hora ISO 8601 que indica quando o evento foi gerado.

origin.pipeline_type

String

Sempre INGESTION_GATEWAY.

origin.pipeline_name

String

Nome do portal.

origin.dataset_name

String

Nome da tabela que está sendo incluída.

origin.catalog_name

String

Nome do Unity Catalog .

origin.schema_name

String

Nome do esquema Unity Catalog .

origin.flow_name

String

Identificador de fluxo que indica a fase de ingestão. Formato: {catalog}.{schema}.{table}_snapshot_flow para carga inicial ou {catalog}.{schema}.{table}_cdc_flow para alterações incrementais.

origin.ingestion_source_type

String

Tipo de banco de dados de origem (por exemplo, SQLSERVER, MYSQL, POSTGRESQL, ORACLE).

details:flow_progress.status

String

Estado atual do fluxo, normalmente RUNNING.

details:flow_progress.metrics.num_upserted_rows

Integer

Número de linhas inseridas ou atualizadas desde o último evento. Delta meso. Reiniciar para zero após cada emissão.

details:flow_progress.metrics.num_deleted_rows

Integer

Número de linhas excluídas desde o último evento. Delta meso. Reiniciar para zero após cada emissão. null para fluxos de Snapshot (Snapshot não exclui).

details:flow_progress.metrics.num_output_bytes

Integer

Número de bytes compactados enviados para um volume desde o último evento. Delta meso. Reiniciar para zero após cada emissão. Preenchido para os fluxos Snapshot e CDC .

details:operation_progress.type

String

tipo de operações. CDC_SNAPSHOT para operações de Snapshot.

details:operation_progress.status

String

Situação atual. IN_PROGRESS enquanto o Snapshot estiver em execução, COMPLETED quando terminar. Outros valores incluem STARTED, CANCELED e FAILED.

details:operation_progress.duration_ms

Integer

Tempo total decorrido das operações em milissegundos.

details:operation_progress.progress_percent

Double

Percentagem de conclusão do Snapshot (0.0 - 100.0). Cumulativo, não delta. O valor aumenta à medida que os blocos são concluídos e atinge 100.0 quando o Snapshot termina.

details:operation_progress.cdc_snapshot.target_table_name

String

Nome completo da tabela que está sendo capturada.

maturity_level

String

Sempre STABLE.

comportamento de lodo

As métricas de progresso se enquadram nas seguintes categorias:

Delta métricas (num_upserted_rows, num_deleted_rows, num_output_bytes):

  • Representam as mudanças ocorridas desde o último evento, e não os totais acumulados.
  • Reset para zero após cada emissão de evento.
  • São emitidos mesmo quando não ocorrem alterações nos dados, servindo como indicadores de atividade.
  • Para fluxos de Snapshot, num_deleted_rows é null porque o Snapshot não produz exclusões.

Métricas cumulativas (progress_percent):

  • O valor acumula de 0.0 para 100.0 ao longo da vida útil de um Snapshot.
  • Atualizações conforme os fragmentos do Snapshot são concluídos. Tabelas pequenas que não são divididas em vários blocos saltam de 0.0 diretamente para 100.0 entre emissões. Tabelas grandes divididas em várias partes são atualizadas gradualmente à medida que cada parte é processada, fornecendo um sinal aproximado de progresso.
  • O valor de metros é uma aproximação para snapshots grandes/fragmentados, e não uma contagem exata em nível de linha.

Configurar eventos de progresso

Os eventos de progresso estão ativados por default para novos gateways. Você pode personalizar o comportamento dos eventos usando parâmetros de configuração do pipeline.

Ativar ou desativar eventos de progresso

JSON
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}

Defina como "false" para desativar os eventos de progresso.

Ajustar a frequência de emissão do evento

JSON
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}

Padrão: 300 segundos (cinco minutos). Intervalo válido: 30 a 3600 segundos (30 segundos a uma hora). Esta configuração controla a cadência dos eventos flow_progress e operation_progress .

Exemplo de configuração de gateway

O exemplo a seguir mostra uma configuração completa de gateway com eventos de progresso ativados e configurados para serem emitidos a cada cinco minutos:

Python
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}

Comportamentos e limitações importantes

comportamento padrão

  • O recurso está habilitado por default para todos os novos gateways.
  • Os pipelines existentes recebem automaticamente esse recurso na próxima atualização ou reinicialização.
  • Nenhuma ação é necessária para ativar os eventos de progresso.

Considerações sobre o tempo

  • A primeira emissão pode levar até o intervalo de frequência configurado (default: cinco minutos) após o início pipeline para que os eventos de progresso apareçam.
  • Os eventos são emitidos na frequência configurada durante a ingestão ativa.

Metodologia de atualização zero

  • Os eventos são emitidos para todas as tabelas, incluindo aquelas com zero atualizações.

  • As métricas de atualização zero ajudam a distinguir entre:

    • Tabelas Parado: Processadas, mas nenhuma alteração nos dados ocorreu.
    • Tabelas não processadas: Ainda não foram selecionadas pelo pipeline.
  • Eventos de atualização zero servem como sinais de atividade, confirmando que o pipeline está em execução.

Comportamento percentual do progressoSnapshot

  • O progresso Snapshot é calculado como (completed_chunks / total_chunks) × 100. Os valores apresentados são aproximados, não uma porcentagem exata por linha.
  • Tabelas que não são divididas em vários blocos (normalmente tabelas menores) saltam de 0.0 diretamente para 100.0 entre emissões porque há apenas um bloco para rastrear.
  • Tabelas grandes divididas em vários blocos são atualizadas incrementalmente à medida que cada bloco é concluído, fornecendo um sinal de progresso gradual que é útil para monitorar cargas iniciais de longa duração.
  • Um status COMPLETED sempre reporta progress_percent = 100.0.
  • A medida não persiste após uma refresh ou reinicialização pipeline . Após uma reinicialização, o progresso do Snapshot é retomado a partir do último ponto de verificação confirmado e as métricas continuam a subir a partir da posição retomada.

Exemplos de consultas

As consultas de exemplo a seguir mostram como monitorar seu gateway usando as métricas de progresso de linha, byte e instantâneo. Substitua <pipeline-id> pelo seu ID de gateway em cada consulta.

consultas de contagem de linhas

Volume por mesa (últimas 24 horas)

Total de operações de upsert e delete para cada tabela nas últimas 24 horas, com a fase de ingestão classificada pelo sufixo do nome do fluxo. Use isso como título de um painel para ver quais tabelas movimentaram mais dados.

SQL
WITH row_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts,
details:flow_progress:metrics:num_deleted_rows::bigint AS deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(upserts) AS rows_upserted_24h,
SUM(COALESCE(deletes, 0)) AS rows_deleted_24h,
SUM(upserts) + SUM(COALESCE(deletes, 0)) AS total_rows_moved_24h
FROM row_events
GROUP BY flow_name, phase
ORDER BY total_rows_moved_24h DESC

Eventos recentes de progresso (última hora)

Contadores de linhas recentes para todas as tabelas em seu pipeline. Útil para monitoramento quase em tempo real.

SQL
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress:metrics:num_upserted_rows::bigint AS num_upserted_rows,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS num_deleted_rows,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp >= current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC

Identifique mesas silenciosas ou travadas

Tabelas que emitem eventos, mas que reportam zero inserções e zero exclusões nos últimos 60 minutos, são candidatas ao status "travadas". Investigue mais a fundo se a fonte deve ser alterada. Tabelas CDC que são genuinamente parado (por exemplo, durante a noite) também aparecem aqui.

SQL
WITH recent_window AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
COUNT(*) AS emissions_in_window,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS upserts_in_window,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS deletes_in_window,
MAX(timestamp) AS last_event
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 60 MINUTES
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
emissions_in_window,
upserts_in_window,
deletes_in_window,
last_event,
ROUND(TIMESTAMPDIFF(MINUTE, last_event, current_timestamp()), 0) AS minutes_since_last_event
FROM recent_window
WHERE upserts_in_window = 0
AND deletes_in_window = 0
ORDER BY minutes_since_last_event DESC

Cronograma por mesa com totais acumulados

Histórico completo, evento por evento, de um único fluxo nas últimas 24 horas, com contagem cumulativa de linhas. Substitua <flow-pattern> por um padrão SQL LIKE (por exemplo, '%customers%_cdc_flow').

SQL
SELECT
origin.flow_name AS flow_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress:metrics:num_upserted_rows::bigint AS upserts_this_period,
COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0) AS deletes_this_period,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint)
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_upserts_this_run,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0))
OVER (PARTITION BY origin.flow_name, origin.update_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_deletes_this_run
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '<flow-pattern>'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp

Consultas de bytes de saída

Bytes por tabela (últimas 24 horas)

Total de bytes enviados para um volume por tabela nas últimas 24 horas, com unidades de fácil compreensão em MB e GB. Ordene em ordem decrescente para visualizar as tabelas com maior volume de dados.

SQL
WITH byte_events AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
details:flow_progress:metrics:num_output_bytes::bigint AS output_bytes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
)
SELECT
flow_name,
phase,
SUM(output_bytes) AS bytes_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0, 2) AS mb_24h,
ROUND(SUM(output_bytes) / 1024.0 / 1024.0 / 1024.0, 3) AS gb_24h
FROM byte_events
GROUP BY flow_name, phase
ORDER BY bytes_24h DESC

Tendência da taxa de transferência (MB por minuto)

Série temporal por minuto do volume de bytes enviados em todos os fluxos nas últimas 24 horas. Renderize como um gráfico de linhas para detectar padrões e paralisações da Taxa de transferência.

SQL
SELECT
DATE_TRUNC('MINUTE', timestamp) AS ts_minute,
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
ROUND(SUM(details:flow_progress:metrics:num_output_bytes::bigint) / 1024.0 / 1024.0, 2) AS mb_per_minute
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY DATE_TRUNC('MINUTE', timestamp), origin.flow_name
ORDER BY origin.flow_name, ts_minute

Média de bytes por linha (detector de tabela ampla ou LOB)

junte num_output_bytes com contagens de linhas para compute a média de bytes por linha por tabela. Valores altos geralmente indicam tabelas LOB ou de esquema amplo que impulsionam o custo de transferência. Útil para planejamento de capacidade e revisão de esquemas.

SQL
WITH joined AS (
SELECT
origin.flow_name AS flow_name,
CASE
WHEN origin.flow_name LIKE '%_snapshot_flow' THEN 'SNAPSHOT'
WHEN origin.flow_name LIKE '%_cdc_flow' THEN 'CDC'
ELSE 'OTHER'
END AS phase,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS total_bytes,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS total_upserts,
SUM(COALESCE(details:flow_progress:metrics:num_deleted_rows::bigint, 0)) AS total_deletes
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp >= current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.flow_name
)
SELECT
flow_name,
phase,
total_upserts + total_deletes AS total_rows,
ROUND(total_bytes / 1024.0 / 1024.0, 2) AS total_mb,
ROUND(total_bytes / NULLIF(total_upserts + total_deletes, 0), 0) AS avg_bytes_per_row
FROM joined
WHERE total_bytes > 0
ORDER BY avg_bytes_per_row DESC

Consultas de progressoSnapshot

Progresso geral do Snapshot

Um resumo em uma única linha de quantas tabelas estão concluídas, em andamento ou na fila na execução atual do Snapshot. Fixe este widget no painel principal para uma verificação rápida de integridade.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
latest_per_table AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
COUNT(*) AS total_tables,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS tables_completed,
SUM(CASE WHEN status = 'IN_PROGRESS' AND progress_pct > 0 AND progress_pct < 100 THEN 1 ELSE 0 END) AS tables_in_progress,
SUM(CASE WHEN progress_pct = 0 THEN 1 ELSE 0 END) AS tables_not_started,
ROUND(AVG(progress_pct), 2) AS overall_progress_pct,
ROUND(SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 1) AS pct_tables_done
FROM latest_per_table
WHERE rn = 1

Quadro de status de instantâneos por mesa

Cada tabela na execução atual do Snapshot, com seu status mais recente e porcentagem de progresso. Ordene por progress_pct para ver quais tabelas precisam de mais trabalho.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
table_status AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:status::string AS status,
details:operation_progress:progress_percent::double AS progress_pct,
timestamp,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
)
SELECT
flow_name,
status,
ROUND(progress_pct, 2) AS progress_pct,
timestamp AS last_update,
CASE
WHEN status = 'COMPLETED' THEN 'Done'
WHEN progress_pct = 0 THEN 'Queued'
ELSE 'Active'
END AS phase
FROM table_status
WHERE rn = 1
ORDER BY
CASE status WHEN 'IN_PROGRESS' THEN 0 WHEN 'COMPLETED' THEN 1 ELSE 2 END,
progress_pct ASC

Snapshot de linhas e bytes carregados na atualização atual.

Combina a porcentagem de progresso, o número de linhas inseridas e o número de bytes enviados para a execução atual do Snapshot. Use isso para renderizar blocos com o texto "X% concluído, N linhas, N MB" em um painel.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
progress AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
ROW_NUMBER() OVER (PARTITION BY origin.flow_name ORDER BY timestamp DESC) AS rn
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
),
volume AS (
SELECT
origin.flow_name AS flow_name,
SUM(details:flow_progress:metrics:num_upserted_rows::bigint) AS rows_loaded,
SUM(details:flow_progress:metrics:num_output_bytes::bigint) AS bytes_loaded
FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.flow_name LIKE '%_snapshot_flow'
AND origin.update_id = (SELECT update_id FROM latest_update)
GROUP BY origin.flow_name
)
SELECT
p.flow_name,
p.status,
ROUND(p.progress_pct, 2) AS progress_pct,
v.rows_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0, 2) AS mb_loaded,
ROUND(v.bytes_loaded / 1024.0 / 1024.0 / 1024.0, 3) AS gb_loaded
FROM progress p
LEFT JOIN volume v ON p.flow_name = v.flow_name
WHERE p.rn = 1
ORDER BY p.progress_pct ASC

Detecção de instantâneo travada

Snapshot de tabelas cujo progress_percent não foi alterado nos últimos 30 minutos. Programe isso como uma consulta de alerta para capturar Snapshots que foram interrompidos, mas ainda estão emitindo eventos.

SQL
WITH latest_update AS (
SELECT origin.update_id AS update_id
FROM event_log('<pipeline-id>')
WHERE event_type = 'create_update'
ORDER BY timestamp DESC LIMIT 1
),
recent AS (
SELECT
origin.flow_name AS flow_name,
details:operation_progress:progress_percent::double AS progress_pct,
details:operation_progress:status::string AS status,
timestamp
FROM event_log('<pipeline-id>')
WHERE event_type = 'operation_progress'
AND level = 'METRICS'
AND origin.update_id = (SELECT update_id FROM latest_update)
AND timestamp >= current_timestamp() - INTERVAL 30 MINUTES
)
SELECT
flow_name,
ROUND(MIN(progress_pct), 2) AS min_pct_30min,
ROUND(MAX(progress_pct), 2) AS max_pct_30min,
ROUND(MAX(progress_pct) - MIN(progress_pct), 2) AS pct_change_30min,
COUNT(*) AS events_in_window,
MAX(timestamp) AS last_event_ts
FROM recent
WHERE status = 'IN_PROGRESS'
GROUP BY flow_name
HAVING MAX(progress_pct) - MIN(progress_pct) = 0
AND MAX(progress_pct) < 100
ORDER BY max_pct_30min ASC

Solução de problemas

Nenhum evento de progresso é exibido.

Se você não visualizar eventos de progresso no log de eventos:

  1. Verifique se pipelines.gateway.progressEventsEnabled está definido como "true".
  2. Aguarde pelo menos um intervalo completo após o início pipeline . O valor padrão é de cinco minutos.
  3. Verifique se o pipeline está em execução e recebendo dados.
  4. Inclua o filtro level = 'METRICS' para ver apenas os eventos de progresso.

Os eventos ocorrem com muita frequência ou pouca frequência.

Se os eventos não ocorrerem com a frequência esperada:

Verifique a configuração pipelines.gateway.progressEventEmitFrequencySeconds e ajuste conforme necessário:

  • O valor padrão é de cinco minutos (300 segundos).
  • Intervalo válido: 30 a 3600 segundos. Ajuste conforme necessário.

As métricas mostram zero após a reinicialização do pipeline.

Se os valores forem zerados após a reinicialização pipeline :

Os números ficam armazenados apenas na memória e são redefinidos ao reiniciar, refresh ou retomar a operação. Isso é intencional para simplificar a implementação. O pipeline começará a acumular novas informações imediatamente.

Faltam métricas para algumas tabelas

Se algumas tabelas não mostrarem eventos de progresso:

  1. Certifique-se de que a tabela não esteja filtrada na configuração do pipeline.
  2. Para a fase CDC , certifique-se de que a tabela de origem tenha CDC ou o acompanhamento de alterações ativado.
  3. Confirme se a tabela está incluída na configuração do gateway.
  4. Observe que progress_percent só é emitido em eventos operation_progress para fluxos de Snapshot. Os fluxos do CDC não emitem eventos operation_progress porque o CDC não tem conceito de conclusão.

Recursos adicionais