Pular para o conteúdo principal

REPLACE WHERE fluxos para tabelas de transmissão autônomas

info

Beta

Os fluxos REPLACE WHERE para tabelas de transmissão independentes estão em versão Beta.

Esta página descreve como usar fluxos REPLACE WHERE para recalcular e sobrescrever um subconjunto específico de uma tabela de transmissão independente sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE, você define um predicado na tabela de destino. Todas as linhas que correspondem ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicado. As linhas que não correspondem ao predicado permanecem inalteradas.

Requisitos

Os fluxos REPLACE WHERE têm os seguintes requisitos:

Quando usar fluxos REPLACE WHERE

Utilize fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento incremental de lotes sem semântica de transmissão : Processar novas linhas em lotes sem gerenciar conceitos de transmissão, como marcas d'água.
  • Reprocessamento seletivo : Recalcula apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intactas.
  • Cenários que vão além das capacidades padrão view materializada :
    • Tabelas de destino com retenção mais longa do que a fonte
    • Impedir o recálculo quando uma tabela de dimensões é alterada.
    • evolução do esquema sem recalcular toda a história

Crie um fluxo SUBSTITUIR ONDE

Use a cláusula FLOW REPLACE WHERE em linha com CREATE OR REFRESH STREAMING TABLE:

SQL
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Durante refresh, todas as linhas da tabela de destino que correspondem ao predicado são excluídas, a consulta de origem é recalculada para o mesmo intervalo de predicado e os novos resultados são inseridos. Neste exemplo, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem.

Não é necessário adicionar o predicado à consulta de origem. O mecanismo de pipeline aplica isso automaticamente ao ler da fonte.

nota

BY NAME é necessário. Isso garante que as colunas sejam correspondidas por nome, e não por posição.

Dados de preenchimento históricos

Para realizar preenchimentos retroativos, execute instruções DML diretamente na tabela de destino:

SQL
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Comportamento refresh completa

Uma refresh completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas inseridas por instruções DML fora do intervalo do predicado atual são excluídas permanentemente.

atenção

Uma refresh completa apaga todos os dados existentes e reexecuta o fluxo usando apenas o predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma refresh completa resultará em uma tabela contendo apenas os dados dos últimos 7 dias. Todas as linhas mais antigas serão excluídas permanentemente.

SQL
REFRESH STREAMING TABLE orders_enriched FULL;

Para evitar a atualização completa de uma tabela, defina a propriedade da tabela pipelines.reset.allowed como false:

SQL
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...

refreshincremental

Os fluxos REPLACE WHERE usam refresh incremental sempre que possível, reprocessando apenas os dados de origem que foram alterados desde a última refresh , em vez de recalcular toda a janela de substituição. refresh incremental requer compute serverless .

Quando refresh incremental se aplica

Todas as afirmações a seguir devem ser verdadeiras:

  • A execução pipeline na compute serverless .
  • O formato da consulta é suportado. Consulte a seção refreshincremental para obter informações sobre o conjunto de operadores compatíveis.
  • O predicado faz referência a colunas base de uma tabela de origem. Predicados em valores derivados, como saídas de funções agregadas ou de janela, não podem ser enviados para uma fonte, o que desativa refresh incremental.
  • Nenhuma operação DML externa modificou linhas na janela de substituição atual. Instruções DML que modificam linhas fora da janela atual não são afetadas.
  • A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se você ampliar o predicado para abranger um intervalo não processado anteriormente, essa refresh resultará em um recálculo completo. As atualizações subsequentes são elegíveis para refresh incremental novamente.
  • O predicado é determinístico. Predicados que utilizam funções não determinísticas, como rand() desativam refresh incremental. Funções temporais como current_date() são permitidas.

A primeira refresh de qualquer fluxo é sempre um cálculo completo. Caso alguma condição não seja atendida, a refresh recorre ao recálculo completo da janela de substituição atual.

Melhores práticas para refreshincremental

Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam elegíveis para refresh incremental.

Use um limite inferior móvel

Os predicados com um limite inferior móvel permanecem elegíveis para refresh incremental indefinidamente.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas previamente excluídas, acionando um fallback único para recálculo completo.

Inclua a coluna de predicado no GROUP BY.

Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa empurrar o predicado para baixo da agregação.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna de predicado estiver ausente de GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é verificada por completo.

Inclua a coluna de predicado na chave join .

Inclua a coluna de predicado na condição join para que o mecanismo possa eliminar todas as fontes unidas.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela resultante de uma junção não expuser a coluna de predicado, essa tabela será verificada por completo a cada refresh.

Diagnosticar a fallback de recálculo completo.

Quando uma refresh recorre a um recálculo completo, o motivo é relatado no evento planning_information para o fluxo. Consulte a seção "Monitorar logs de eventos do pipeline". A tabela a seguir lista os motivos relatados no evento:

Razão

Significado

EXTERNAL_CHANGE_IN_REPLACE_WINDOW

Uma operação DML externa modificou as linhas na janela de substituição atual.

REPLACE_WHERE_NOT_DETERMINISTIC

O predicado utiliza expressões não determinísticas.

PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC

A refresh anterior utilizou um predicado não determinístico.

UNSUPPORTED_REPLACE_WHERE_PREDICATE

O predicado não pode ser enviado para nenhuma fonte, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado.

Exemplos

Os exemplos a seguir mostram padrões de fluxo comuns do comando REPLACE WHERE.

Exemplo 1: Manter agregados históricos de uma fonte de retenção limitada

Este exemplo mantém os agregados diários indefinidamente, mesmo depois que os dados brutos expiram na tabela de origem (retenção de 3 dias):

SQL
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Exemplo 2: Impedir o recálculo quando uma tabela de dimensões for alterada.

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos da dimensão são alterados:

SQL
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;

Se a região de um usuário for alterada, apenas as linhas recentes serão recalculadas. As linhas históricas conservam o valor da região no momento em que foram escritas.

Exemplo 3: Adicionar novas métricas sem recalcular a história completa

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:

    SQL
    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
  2. Atualize a consulta para adicionar uniq_users:

    SQL
    CREATE OR REFRESH STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks,
    COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;

    Linhas mais antigas que a janela de 7 dias contêm NULL para uniq_users.

Exemplo 4: Itere em uma pequena janela antes de preencher o histórico completo.

Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar todo o intervalo histórico.

Comece com uma janela curta para validar as métricas e iterar na lógica de negócios com custos compute mais baixos:

SQL
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Uma janela curta recalcula apenas os últimos 7 dias a cada refresh, portanto, revise a consulta quantas vezes forem necessárias antes de confirmar uma execução histórica completa.

Após a consulta ser finalizada, utilize DML para preencher todo o intervalo histórico:

SQL
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;