Pular para o conteúdo principal

processamento de lotes com fluxos REPLACE WHERE

info

Beta

SUBSTITUIR ONDE os fluxos estão em Beta.

Esta página descreve como usar os fluxos REPLACE WHERE no pipeline declarativo LakeFlow Spark para recalcular e sobrescrever um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE, você define um predicado na tabela de destino. Todas as linhas que correspondem ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicado. As linhas que não correspondem ao predicado permanecem inalteradas.

Requisitos

Os fluxos REPLACE WHERE têm os seguintes requisitos:

  • Seu pipeline deve usar o canal PREVIEW .
  • Databricks recomenda Unity Catalog e compute serverless . refreshincremental só é suportada em compute serverless .

Quando usar fluxos REPLACE WHERE

Utilize fluxos REPLACE WHERE para os seguintes cenários:

  • Processamento incremental de lotes sem semântica de transmissão : Processar novas linhas em lotes sem gerenciar conceitos de transmissão, como marcas d'água.
  • Reprocessamento seletivo : Recalcula apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intactas.
  • Cenários que vão além das capacidades padrão view materializada :
    • Tabelas de destino com retenção mais longa do que a fonte
    • Impedir o recálculo quando uma tabela de dimensões é alterada.
    • evolução do esquema sem recalcular toda a história

Crie um fluxo SUBSTITUIR ONDE

Defina os fluxos REPLACE WHERE em SQL ou Python.

Use a cláusula FLOW REPLACE WHERE em linha com CREATE STREAMING TABLE:

SQL
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Alternativamente, use a sintaxe completa CREATE FLOW :

SQL
CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Nestes exemplos, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem. Não é necessário adicionar o predicado à consulta de origem. O mecanismo de pipeline aplica isso automaticamente ao ler da fonte.

nota

BY NAME É necessário em SQL. A correspondência entre colunas é feita por nome, e não por posição.

Dados de preenchimento históricos

Para gravar linhas históricas ou corrigidas na tabela de destino fora da atualização agendada, escolha entre dois mecanismos com base em onde os dados históricos estão armazenados:

  • Substituições de predicado : Reexecuta a consulta de origem do fluxo para um intervalo de predicado único. Utilize quando os dados históricos provêm da mesma fonte que os dados incrementais.
  • Instruções DML : Inserem dados diretamente na tabela de destino, ignorando o fluxo. Utilize esta ferramenta quando os dados históricos estiverem em uma fonte diferente dos dados incrementais.

Predicado sobrepõe

Substitua o predicado REPLACE WHERE para uma única atualização de pipeline sem modificar a definição do pipeline. As substituições de predicados são únicas, aplicam-se apenas à atualização atual e não afetam execuções futuras.

Exemplo: Carga histórica inicial

Para realizar um preenchimento único de dados históricos ao configurar um pipeline pela primeira vez:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)

Exemplo: Corrigir uma coluna para um período específico

Após atualizar a definição de uma coluna, preencha retroativamente a alteração para um intervalo histórico específico:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)

Combine múltiplas dimensões em uma única substituição de predicado:

Python
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]

Função auxiliar: start_update_with_replace_where

Use a API de atualização pipeline a partir de um Notebook para enviar substituições de predicados:

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()

body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}

if refresh_selection:
body["refresh_selection"] = refresh_selection

res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)

return StartUpdateResponse.from_dict(res)

instruções DML

Executar instruções DML diretamente na tabela de destino a partir de fora do pipeline para realizar cargas iniciais ou correções, como carregar dados de uma tabela legada:

SQL
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem mesmo após uma atualização agendada, a menos que estejam dentro do intervalo do predicado de uma execução futura.

Comportamento refresh completa

Uma refresh completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas inseridas por meio de substituições de predicados ou instruções DML fora do intervalo de predicados atual são excluídas permanentemente.

atenção

Uma refresh completa apaga todos os dados existentes e reexecuta o fluxo usando apenas o predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma refresh completa resultará em uma tabela contendo apenas os dados dos últimos 7 dias. Todas as linhas mais antigas serão excluídas permanentemente.

Para evitar a atualização completa de uma tabela, defina a propriedade da tabela pipelines.reset.allowed para false. Consulte a referência de propriedades do pipeline.

refreshincremental

Os fluxos REPLACE WHERE usam refresh incremental sempre que possível, reprocessando apenas os dados de origem que foram alterados desde a última refresh , em vez de recalcular toda a janela de substituição. refresh incremental requer compute serverless .

Quando refresh incremental se aplica

Todas as afirmações a seguir devem ser verdadeiras:

  • A execução pipeline na compute serverless .
  • O formato da consulta é suportado. Consulte a seção refreshincremental para obter informações sobre o conjunto de operadores compatíveis.
  • O predicado faz referência a colunas base de uma tabela de origem. Predicados em valores derivados, como saídas de funções agregadas ou de janela, não podem ser enviados para uma fonte, o que desativa refresh incremental.
  • Nenhuma operação DML externa modificou linhas na janela de substituição atual. Instruções DML que modificam linhas fora da janela atual não são afetadas.
  • A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se você ampliar o predicado para abranger um intervalo não processado anteriormente, essa refresh resultará em um recálculo completo. As atualizações subsequentes são elegíveis para refresh incremental novamente.
  • O predicado é determinístico. Predicados que utilizam funções não determinísticas, como rand() desativam refresh incremental. Funções temporais como current_date() são permitidas.

A primeira refresh de qualquer fluxo é sempre um cálculo completo. Caso alguma condição não seja atendida, a refresh recorre ao recálculo completo da janela de substituição atual.

Melhores práticas para refreshincremental

Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam elegíveis para refresh incremental.

Use um limite inferior móvel

Os predicados com um limite inferior móvel permanecem elegíveis para refresh incremental indefinidamente.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas previamente excluídas, acionando um fallback único para recálculo completo.

Inclua a coluna de predicado no GROUP BY.

Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa empurrar o predicado para baixo da agregação.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Se a coluna de predicado estiver ausente de GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é verificada por completo.

Inclua a coluna de predicado na chave join .

Inclua a coluna de predicado na condição join para que o mecanismo possa eliminar todas as fontes unidas.

SQL
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Se uma tabela resultante de uma junção não expuser a coluna de predicado, essa tabela será verificada por completo a cada refresh.

Diagnosticar a fallback de recálculo completo.

Quando uma refresh recorre a um recálculo completo, o motivo é relatado no evento planning_information para o fluxo. Consulte a seção "Monitorar logs de eventos do pipeline". A tabela a seguir lista os motivos relatados no evento:

Razão

Significado

EXTERNAL_CHANGE_IN_REPLACE_WINDOW

Uma operação DML externa modificou as linhas na janela de substituição atual.

REPLACE_WHERE_NOT_DETERMINISTIC

O predicado utiliza expressões não determinísticas.

PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC

A refresh anterior utilizou um predicado não determinístico.

UNSUPPORTED_REPLACE_WHERE_PREDICATE

O predicado não pode ser enviado para nenhuma fonte, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado.

Limitações

Os fluxos REPLACE WHERE têm as seguintes limitações:

  • A tabela de destino deve ser criada dentro do pipeline.
  • É permitido apenas um fluxo REPLACE WHERE por tabela de destino.
  • Uma tabela que é alvo de um fluxo REPLACE WHERE não pode ser alvo de outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de acréscimo.
  • As expectativas não são suportadas em tabelas alvo de fluxos REPLACE WHERE.
  • Para tabelas de transmissão criadas no Databricks SQL, consulte Fluxos REPLACE WHERE para tabelas de transmissão independentes para obter informações sobre diferenças de sintaxe e preenchimento retroativo.

Exemplos

Os exemplos a seguir mostram padrões de fluxo comuns do comando REPLACE WHERE.

Exemplo 1: Manter agregados históricos de uma fonte de retenção limitada

Este exemplo mantém os agregados diários indefinidamente, mesmo depois que os dados brutos expiram na tabela de origem (retenção de 3 dias):

SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Exemplo 2: Impedir o recálculo quando uma tabela de dimensões for alterada.

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos da dimensão são alterados:

SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;

Se a região de um usuário for alterada, apenas as linhas recentes serão recalculadas. As linhas históricas conservam o valor da região no momento em que foram escritas. Para corrigir linhas históricas, execute um preenchimento reverso direcionado usando substituições de predicado.

Exemplo 3: Adicionar novas métricas sem recalcular a história completa

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:
SQL
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL;
  1. Atualize a consulta para adicionar uniq_users:
SQL
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL;
  1. Preencha os novos dados dos últimos 30 dias:

    Python
    overrides = [
    {
    "flow_name": "clickstream_daily",
    "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
    }
    ]

    resp = start_update_with_replace_where(
    pipeline_id="<pipeline-id>",
    replace_where_overrides=overrides,
    refresh_selection=["clickstream_daily"],
    )

    As linhas mais antigas que o intervalo preenchido contêm NULL para uniq_users.

Exemplo 4: Itere em uma pequena janela antes de preencher o histórico completo.

Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar todo o intervalo histórico.

Comece com um intervalo curto para que cada refresh recalcule apenas os últimos 7 dias enquanto você revisa a consulta:

SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Após a finalização da consulta, utilize uma substituição de predicado para realizar um preenchimento histórico único:

Python
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]

resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)