Pular para o conteúdo principal

SUBSTITUIR fluxos WHERE em Databricks SQL

info

Beta

Os fluxos REPLACE WHERE para tabelas de transmissão no Databricks SQL estão em versão Beta.

Esta página descreve como usar fluxos REPLACE WHERE para recalcular e sobrescrever um subconjunto específico de uma tabela de transmissão no Databricks SQL sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE, você define um predicado na tabela de destino. Todas as linhas que correspondem ao predicado são excluídas, em seguida, a consulta de origem é reavaliada para esse intervalo de predicado e os novos resultados são inseridos. As linhas que não correspondem ao predicado permanecem inalteradas.

Para obter a referência completa do recurso, incluindo a sintaxe Python e as substituições de predicados para preenchimentos retroativos, consulte os fluxos REPLACE WHERE.

Requisitos

Antes de criar um fluxo REPLACE WHERE no Databricks SQL, confirme o seguinte:

  • Sua tabela de transmissão deve usar o canal PREVIEW . Defina o canal usando a propriedade de tabela pipelines.channel :

    SQL
    CREATE STREAMING TABLE st_preview
    TBLPROPERTIES (pipelines.channel = 'PREVIEW')
    ...
  • Databricks recomenda Unity Catalog e compute serverless para a melhor experiência.

Quando usar fluxos REPLACE WHERE

Os fluxos REPLACE WHERE são adequados para os seguintes cenários:

  • Processamento incremental de lotes sem semântica de transmissão: Processar novas linhas em lotes sem gerenciar conceitos de transmissão, como marcas d'água.
  • Reprocessamento seletivo: Recalcula apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intactas.
  • Cenários que vão além das capacidades padrão view materializada:
    • Tabelas de destino com retenção mais longa do que a fonte
    • Impedir o recálculo quando uma tabela de dimensões é alterada.
    • evolução do esquema sem recalcular toda a história

diretrizes de projeto de predicados

Evite usar predicados REPLACE WHERE em colunas agregadas ou derivadas. Por exemplo, um predicado como total_sales > 100000 onde total_sales é um SUM() exige que o mecanismo recalcule a agregação para todas as partições em cada execução. Use predicados em colunas base como date ou region para que o mecanismo possa enviar o filtro para a fonte e processar apenas os dados relevantes.

Crie um fluxo SUBSTITUIR ONDE

Use a cláusula FLOW REPLACE WHERE em linha com CREATE OR REFRESH STREAMING TABLE:

SQL
CREATE OR REFRESH STREAMING TABLE orders_enriched
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Durante refresh, todas as linhas da tabela de destino que correspondem ao predicado são excluídas, a consulta de origem é recalculada para o mesmo intervalo de predicado e os novos resultados são inseridos. Neste exemplo, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas a partir da consulta de origem.

Não é necessário adicionar o predicado à consulta de origem. O mecanismo de pipeline aplica isso automaticamente ao ler da fonte.

nota

BY NAME é necessário. A correspondência entre colunas é feita por nome, e não por posição.

nota

A sintaxe CREATE FLOW e a sintaxe Python não são suportadas para tabelas de transmissão criadas no Databricks SQL. Defina a cláusula REPLACE WHERE diretamente na instrução CREATE OR REFRESH STREAMING TABLE .

Preencha com instruções DML

A substituição de predicados não é suportada para tabelas de transmissão criadas no Databricks SQL. Para realizar preenchimentos retroativos, como carregar dados históricos, corrigir uma coluna para um período específico ou carregar dados de uma tabela legada, execute instruções DML diretamente na tabela de destino.

SQL
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem mesmo após uma atualização agendada, a menos que estejam dentro do intervalo do predicado de uma execução futura.

Para pipelines que suportam substituições de predicados, consulte Executar um preenchimento reverso com substituições de predicados.

Comportamento refresh completa

Uma refresh completa em um fluxo REPLACE WHERE executa novamente apenas o intervalo do predicado, e não a consulta de origem completa. Leia o aviso abaixo antes de executar uma refresh completa.

atenção

Uma refresh completa apaga todos os dados existentes e reexecuta o fluxo usando apenas o predicado definido. Se uma tabela estiver sendo atualizada há um ano com um predicado de 7 dias, uma refresh completa resultará em uma tabela contendo apenas os dados dos últimos 7 dias. Todas as linhas mais antigas serão excluídas permanentemente.

SQL
REFRESH STREAMING TABLE orders_enriched FULL;

Para evitar a atualização completa de uma tabela, defina a propriedade da tabela pipelines.reset.allowed como false:

SQL
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
...

Exemplos

Os exemplos a seguir mostram padrões comuns de fluxo REPLACE WHERE no Databricks SQL.

Mantenha os agregados históricos provenientes de uma fonte de retenção limitada.

Este exemplo mantém os agregados diários indefinidamente, mesmo depois que os dados brutos expiram na tabela de origem, que tem um período de retenção de 3 dias:

SQL
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Impedir o recálculo quando uma tabela de dimensões for alterada.

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos da dimensão são alterados:

SQL
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;

Se a região de um usuário for alterada, apenas as linhas recentes serão recalculadas. As linhas históricas conservam o valor da região no momento em que foram escritas.

Adicionar novas métricas sem recalcular a história completa

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:

    SQL
    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
  2. Atualize a consulta para adicionar uniq_users:

    SQL
    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks,
    COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
  3. Preencha a nova métrica usando DML:

    SQL
    INSERT INTO clickstream_daily
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks,
    COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    WHERE event_date BETWEEN '2026-01-01' AND '2026-01-30'
    GROUP BY ALL;

    As linhas mais antigas que o intervalo preenchido contêm NULL para uniq_users.