Pular para o conteúdo principal

SUBSTITUIR ONDE flui

info

Beta

SUBSTITUIR ONDE os fluxos estão em Beta.

Esta página descreve como usar os fluxos REPLACE WHERE no pipeline declarativo LakeFlow Spark para recalcular e sobrescrever um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.

Com um fluxo REPLACE WHERE, você define um predicado na tabela de destino. Todas as linhas que correspondem ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicado. As linhas que não correspondem ao predicado permanecem inalteradas.

Requisitos

  • Seu pipeline deve usar o canal PREVIEW .
  • Databricks recomenda Unity Catalog e compute serverless para a melhor experiência.

Quando usar fluxos REPLACE WHERE

Os fluxos REPLACE WHERE são adequados para os seguintes cenários:

  • Processamento incremental de lotes sem semântica de transmissão: Processar novas linhas em lotes sem conceitos de transmissão, como marcas d'água.
  • Reprocessamento seletivo: Recalcula apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intactas.
  • Cenários que vão além das capacidades padrão view materializada:
    • Tabelas de destino com retenção mais longa do que a fonte
    • Impedir o recálculo quando uma tabela de dimensões é alterada.
    • evolução do esquema sem recalcular toda a história

diretrizes de projeto de predicados

Evite usar predicados REPLACE WHERE em colunas agregadas ou derivadas. Por exemplo, um predicado como total_sales > 100000 onde total_sales é um SUM() exige que o mecanismo recalcule a agregação para todas as partições em cada execução. Use predicados em colunas base como date ou region para que o mecanismo possa enviar o filtro para a fonte e processar apenas os dados relevantes.

Crie um fluxo SUBSTITUIR ONDE

Você pode definir fluxos REPLACE WHERE tanto em SQL quanto em Python.

Use a cláusula FLOW REPLACE WHERE em linha com CREATE STREAMING TABLE:

SQL
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_timestamp(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Você também pode usar a sintaxe completa CREATE FLOW :

SQL
CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_timestamp(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;

Nestes exemplos, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas a partir da consulta de origem. Você não precisa adicionar o predicado à consulta de origem — o mecanismo de pipeline o aplica automaticamente ao ler da origem.

nota

BY NAME É necessário em SQL. A correspondência entre colunas é feita por nome, e não por posição.

Realizar um preenchimento retroativo com substituições de predicados

Você pode substituir o predicado REPLACE WHERE para uma única atualização de pipeline sem modificar a definição do pipeline. As substituições de predicados são únicas, aplicam-se apenas à atualização atual e não afetam execuções futuras.

Exemplo: Carga histórica inicial

Para realizar um preenchimento único de dados históricos ao configurar um pipeline pela primeira vez:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
nota

Os dados históricos devem provir da mesma fonte que os dados incrementais. Se você tiver dados históricos de uma fonte diferente, use instruções DML diretamente na tabela de destino. Consulte Preenchimento retroativo com instruções DML.

Exemplo: Corrigir uma coluna para um período específico

Após atualizar a definição de uma coluna, preencha retroativamente a alteração para um intervalo histórico específico:

Python
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30)",
}
]

resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)

Você também pode combinar várias dimensões na substituição do predicado:

Python
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_timestamp(), -30) AND region = 'asia'",
}
]

Função auxiliar: start_update_with_replace_where

Use a API de atualização pipeline a partir de um Notebook para enviar substituições de predicados:

Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()

body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}

if refresh_selection:
body["refresh_selection"] = refresh_selection

res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)

return StartUpdateResponse.from_dict(res)

Preencha com instruções DML

Você pode executar instruções DML diretamente na tabela de destino a partir de fora do pipeline para realizar cargas iniciais ou correções, como carregar dados de uma tabela legada:

SQL
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem mesmo após uma atualização agendada, a menos que estejam dentro do intervalo do predicado de uma execução futura.

Comportamento refresh completa

atenção

Uma refresh completa apaga todos os dados existentes e reexecuta o fluxo usando apenas o predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma refresh completa resultará em uma tabela contendo apenas os dados dos últimos 7 dias. Todas as linhas mais antigas serão excluídas permanentemente.

Para evitar a atualização completa de uma tabela, defina a propriedade da tabela pipelines.reset.allowed para false. Consulte a referência de propriedades do pipeline.

Limitações

  • A tabela de destino deve ser criada dentro do pipeline.
  • É permitido apenas um fluxo REPLACE WHERE por tabela de destino.
  • Uma tabela que é alvo de um fluxo REPLACE WHERE não pode ser alvo de outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de acréscimo.
  • As expectativas não são suportadas em tabelas alvo de fluxos REPLACE WHERE.
  • Para tabelas de transmissão criadas em Databricks SQL, consulte Fluxos REPLACE WHERE em Databricks SQL para obter informações sobre diferenças de sintaxe e preenchimento retroativo.

Exemplos

Mantenha os agregados históricos provenientes de uma fonte de retenção limitada.

Este exemplo mantém os agregados diários indefinidamente, mesmo depois que os dados brutos expiram na tabela de origem (retenção de 3 dias):

SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Impedir o recálculo quando uma tabela de dimensões for alterada.

Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos da dimensão são alterados:

SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;

Se a região de um usuário for alterada, apenas as linhas recentes serão recalculadas. As linhas históricas conservam o valor da região no momento em que foram escritas. Você pode corrigir linhas históricas posteriormente com um preenchimento retroativo direcionado usando substituições de predicado.

Adicionar novas métricas sem recalcular a história completa

Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:

  1. Defina a tabela inicial:

    SQL
    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
  2. Atualize a consulta para adicionar uniq_users:

    SQL
    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
    event_date,
    page_id,
    COUNT(*) AS clicks,
    COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
  3. Preencha os novos dados dos últimos 30 dias:

    Python
    overrides = [
    {
    "flow_name": "clickstream_daily",
    "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
    }
    ]

    resp = start_update_with_replace_where(
    pipeline_id="<pipeline-id>",
    replace_where_overrides=overrides,
    refresh_selection=["clickstream_daily"],
    )

    As linhas mais antigas que o intervalo preenchido contêm NULL para uniq_users.