Monitore o progresso do gateway de ingestão com logsde eventos.
Aplica-se a : Conectores de banco de dados
Aprenda a usar logs de eventos para monitorar o progresso dos gateways de ingestão no Tempo Real. logs de eventos fornecem informações por tabela para as fases de snapshot e captura de dados de alterações (CDC), permitindo rastrear a integridade pipeline , identificar pipelines paralisados e criar soluções de monitoramento automatizadas.
Os eventos de progresso permitem que você:
- Acompanhe a quantidade de dados ingeridos por tabela sem precisar esperar a conclusão do pipeline.
- Monitore cada tabela de dados inseridos individualmente para identificar gargalos ou problemas.
- Receba eventos mesmo quando não houver alterações nos dados para confirmar que o pipeline está em execução.
- Crie alertas e painéis usando dados de eventos estruturados em vez de analisar logs.
Como funcionam os eventos de progresso
O gateway emite flow_progress eventos em intervalos regulares (default: 5 minutos) para cada tabela em seu pipeline. Cada evento inclui:
- Nomes das tabelas de origem e destino.
- Número de linhas inseridas e excluídas desde o último evento.
- Quando o evento foi gerado.
Os eventos estão disponíveis na tabela log de eventos, mas não por meio APIs públicas. Você pode consultar a tabela log de eventos usando SQL para analisar o comportamento pipeline e criar soluções de monitoramento.
Acesse os eventos de progresso
Os eventos de progresso são armazenados na tabela log de eventos. Para acessá-los:
- Navegue até o seu gateway no workspace Databricks .
- Clique na tab " logeventos" para view os eventos na interface do usuário.
- Consulte a tabela log de eventos diretamente usando SQL para uma análise detalhada.
Consultar a tabela log de eventos
Para consultar eventos de progresso usando SQL:
SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.', origin.dataset_name) as table_name,
details:flow_progress.metrics.num_upserted_rows as rows_upserted,
details:flow_progress.metrics.num_deleted_rows as rows_deleted,
CASE
WHEN LOWER(origin.flow_name) LIKE '%cdc%' THEN 'cdc'
WHEN LOWER(origin.flow_name) LIKE '%snapshot%' THEN 'snapshot'
ELSE 'unknown'
END as ingestion_phase
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC
Substitua <pipeline-id> pelo seu ID de gateway.
Compreenda a estrutura do evento.
Os eventos de progresso usam o tipo de evento flow_progress com nível log METRICS . Os exemplos a seguir mostram a estrutura JSON para eventos de progresso de Snapshot e CDC :
Estrutura do evento de progresso Snapshot
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:14.175Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_snapshot_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_snapshot_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 7512704,
"num_deleted_rows": 0
}
}
},
"maturity_level": "STABLE"
}
Estrutura de eventos de progresso do CDC
{
"id": "01234567-89ab-cdef-0123-456789abcdef",
"timestamp": "2025-10-14T13:33:57.426Z",
"level": "METRICS",
"event_type": "flow_progress",
"origin": {
"pipeline_type": "INGESTION_GATEWAY",
"pipeline_name": "MyPipeline",
"dataset_name": "customers",
"catalog_name": "main",
"schema_name": "sales",
"flow_name": "main.sales.customers_cdc_flow",
"ingestion_source_type": "SQLSERVER"
},
"message": "Completed a streaming update of 'main.sales.customers_cdc_flow'.",
"details": {
"flow_progress": {
"status": "RUNNING",
"metrics": {
"num_upserted_rows": 25,
"num_deleted_rows": 3
}
}
},
"maturity_level": "STABLE"
}
Campos de evento
A tabela a seguir descreve os key campos nos eventos em andamento:
campo | Tipo | Descrição |
|---|---|---|
| String | Sempre |
| String | Sempre |
| String | Carimbo de data/hora ISO 8601 que indica quando o evento foi gerado. |
| String | Sempre |
| String | Nome do portal. |
| String | Nome da tabela que está sendo incluída. |
| String | Nome do Unity Catalog . |
| String | Nome do esquema Unity Catalog . |
| String | Identificador de fluxo que indica a fase de ingestão. Formato: |
| String | Tipo de banco de dados de origem (por exemplo, |
| String | Estado atual do fluxo, normalmente |
| Integer | Número de linhas inseridas ou atualizadas desde o último evento. |
| Integer | Número de linhas excluídas desde o último evento. |
| String | Sempre |
comportamento de lodo
- A contagem de linhas representa as alterações desde o último evento, e não os totais acumulados.
- A contagem é zerada após cada emissão de evento.
- Os eventos são emitidos mesmo quando não ocorrem alterações nos dados, servindo como indicadores de atividade.
Configurar eventos de progresso
Os eventos de progresso estão ativados por default para novos gateways. Você pode personalizar o comportamento dos eventos usando parâmetros de configuração do pipeline.
Ativar ou desativar eventos de progresso
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true"
}
Defina como "false" para desativar os eventos de progresso.
Ajustar a frequência de emissão do evento
"configuration": {
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
}
Padrão: 300 segundos (cinco minutos). Intervalo válido: 30 a 3600 segundos (30 segundos a 1 hora).
Exemplo de configuração de gateway
O exemplo a seguir mostra uma configuração completa de gateway com eventos de progresso ativados e configurados para serem emitidos a cada cinco minutos:
gateway_pipeline_spec = {
"pipeline_type": "INGESTION_GATEWAY",
"name": "my_gateway_pipeline",
"catalog": "main",
"target": "my_schema",
"continuous": True,
"configuration": {
"pipelines.gateway.progressEventsEnabled": "true",
"pipelines.gateway.progressEventEmitFrequencySeconds": "300"
},
# ... rest of pipeline spec
}
Comportamentos e limitações importantes
comportamento padrão
- O recurso está habilitado por default para todos os novos gateways.
- Os pipelines existentes recebem automaticamente esse recurso na próxima atualização ou reinicialização.
- Nenhuma ação é necessária para ativar os eventos de progresso.
Considerações sobre o tempo
- A primeira emissão pode levar até o intervalo de frequência configurado (default: cinco minutos) após o início pipeline para que os eventos de progresso apareçam.
- Os eventos são emitidos na frequência configurada durante a ingestão ativa.
Metodologia de atualização zero
-
Os eventos são emitidos para todas as tabelas, incluindo aquelas com zero atualizações.
-
As métricas de atualização zero ajudam a distinguir entre:
- Tabelas Parado: Processadas, mas nenhuma alteração nos dados ocorreu.
- Tabelas não processadas: Ainda não foram selecionadas pelo pipeline.
-
Eventos de atualização zero servem como sinais de atividade, confirmando que o pipeline está em execução.
Exemplos de consultas
Veja os eventos de progresso recentes
Veja os eventos de progresso recentes para todas as tabelas em seu pipeline:
SELECT
origin.pipeline_name,
origin.dataset_name,
origin.flow_name,
details:flow_progress.metrics.num_upserted_rows,
details:flow_progress.metrics.num_deleted_rows,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
Substitua <pipeline-id> pelo seu ID de gateway.
Agregar por tabela
Calcule o total de operações de upsert e delete para cada tabela durante um determinado período:
SELECT
origin.dataset_name,
COUNT(*) as event_count,
SUM(details:flow_progress.metrics.num_upserted_rows) as total_upserts,
SUM(details:flow_progress.metrics.num_deleted_rows) as total_deletes,
MIN(timestamp) as first_event,
MAX(timestamp) as last_event
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 24 HOURS
GROUP BY origin.dataset_name
ORDER BY total_upserts DESC
Identificar mesas de paraíso
Identificar tabelas com zero atualizações para distinguir tabelas parado de tabelas paralisadas:
SELECT
origin.dataset_name,
origin.flow_name,
timestamp
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND details:flow_progress.metrics.num_upserted_rows = 0
AND details:flow_progress.metrics.num_deleted_rows = 0
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY timestamp DESC
Monitorar a frequência de emissão
Verifique se os eventos estão sendo emitidos na frequência esperada:
SELECT
origin.dataset_name,
timestamp,
LEAD(timestamp) OVER (PARTITION BY origin.dataset_name ORDER BY timestamp) - timestamp as interval_seconds
FROM event_log(`<pipeline-id>`)
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND timestamp > current_timestamp() - INTERVAL 1 HOUR
ORDER BY origin.dataset_name, timestamp
Solução de problemas
Nenhum evento de progresso é exibido.
Se você não visualizar eventos de progresso no log de eventos:
- Verifique se
pipelines.gateway.progressEventsEnabledestá definido como"true". - Aguarde pelo menos um intervalo completo após o início pipeline . O valor padrão é de cinco minutos.
- Verifique se o pipeline está em execução e recebendo dados.
- Inclua o filtro
level = 'METRICS'para ver apenas os eventos de progresso.
Os eventos ocorrem com muita frequência ou pouca frequência.
Se os eventos não ocorrerem com a frequência esperada:
Verifique a configuração pipelines.gateway.progressEventEmitFrequencySeconds e ajuste conforme necessário:
- O valor padrão é de cinco minutos (300 segundos).
- Intervalo válido: 30 a 3600 segundos. Ajuste conforme necessário.
As métricas mostram zero após a reinicialização do pipeline.
Se os valores forem zerados após a reinicialização pipeline :
Os números ficam armazenados apenas na memória e são redefinidos ao reiniciar, refresh ou retomar a operação. Isso é intencional para simplificar a implementação. O pipeline começará a acumular novas informações imediatamente.
Faltam métricas para algumas tabelas
Se algumas tabelas não mostrarem eventos de progresso:
- Certifique-se de que a tabela não esteja filtrada na configuração do pipeline.
- Para a fase CDC , certifique-se de que a tabela de origem tenha CDC ou o acompanhamento de alterações ativado.
- Confirme se a tabela está incluída na configuração do gateway.