processamento de lotes com fluxos REPLACE WHERE
Beta
SUBSTITUIR ONDE os fluxos estão em Beta.
Esta página descreve como usar os fluxos REPLACE WHERE no pipeline declarativo LakeFlow Spark para recalcular e sobrescrever um subconjunto específico de uma tabela sem reprocessar todo o histórico da tabela. Os fluxos REPLACE WHERE lidam com dados que chegam com atraso, reprocessamento a montante, evolução do esquema e preenchimentos retroativos.
Com um fluxo REPLACE WHERE, você define um predicado na tabela de destino. Todas as linhas que correspondem ao predicado são excluídas e substituídas pela reavaliação da consulta de origem para o mesmo intervalo de predicado. As linhas que não correspondem ao predicado permanecem inalteradas.
Requisitos
Os fluxos REPLACE WHERE têm os seguintes requisitos:
- Seu pipeline deve usar o canal
PREVIEW. - Databricks recomenda Unity Catalog e compute serverless . refreshincremental só é suportada em compute serverless .
Quando usar fluxos REPLACE WHERE
Utilize fluxos REPLACE WHERE para os seguintes cenários:
- Processamento incremental de lotes sem semântica de transmissão : Processar novas linhas em lotes sem gerenciar conceitos de transmissão, como marcas d'água.
- Reprocessamento seletivo : Recalcula apenas as linhas que correspondem a um predicado, deixando todas as outras linhas intactas.
- Cenários que vão além das capacidades padrão view materializada :
- Tabelas de destino com retenção mais longa do que a fonte
- Impedir o recálculo quando uma tabela de dimensões é alterada.
- evolução do esquema sem recalcular toda a história
Crie um fluxo SUBSTITUIR ONDE
Defina os fluxos REPLACE WHERE em SQL ou Python.
- SQL
- Python
Use a cláusula FLOW REPLACE WHERE em linha com CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternativamente, use a sintaxe completa CREATE FLOW :
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Em Python, a tabela e o fluxo são definidos em uma única instrução. O fluxo herda o mesmo nome da tabela:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
O parâmetro replace_where aceita uma expressão de coluna PySpark ou um predicado de strings.
Nestes exemplos, todas as linhas dos últimos 7 dias são excluídas de orders_enriched e recalculadas usando a consulta de origem. Não é necessário adicionar o predicado à consulta de origem. O mecanismo de pipeline aplica isso automaticamente ao ler da fonte.
BY NAME É necessário em SQL. A correspondência entre colunas é feita por nome, e não por posição.
Dados de preenchimento históricos
Para gravar linhas históricas ou corrigidas na tabela de destino fora da atualização agendada, escolha entre dois mecanismos com base em onde os dados históricos estão armazenados:
- Substituições de predicado : Reexecuta a consulta de origem do fluxo para um intervalo de predicado único. Utilize quando os dados históricos provêm da mesma fonte que os dados incrementais.
- Instruções DML : Inserem dados diretamente na tabela de destino, ignorando o fluxo. Utilize esta ferramenta quando os dados históricos estiverem em uma fonte diferente dos dados incrementais.
Predicado sobrepõe
Substitua o predicado REPLACE WHERE para uma única atualização de pipeline sem modificar a definição do pipeline. As substituições de predicados são únicas, aplicam-se apenas à atualização atual e não afetam execuções futuras.
Exemplo: Carga histórica inicial
Para realizar um preenchimento único de dados históricos ao configurar um pipeline pela primeira vez:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Exemplo: Corrigir uma coluna para um período específico
Após atualizar a definição de uma coluna, preencha retroativamente a alteração para um intervalo histórico específico:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combine múltiplas dimensões em uma única substituição de predicado:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Função auxiliar: start_update_with_replace_where
start_update_with_replace_whereUse a API de atualização pipeline a partir de um Notebook para enviar substituições de predicados:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
instruções DML
Executar instruções DML diretamente na tabela de destino a partir de fora do pipeline para realizar cargas iniciais ou correções, como carregar dados de uma tabela legada:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
As linhas inseridas por meio de DML não estão sujeitas ao predicado REPLACE WHERE e persistem mesmo após uma atualização agendada, a menos que estejam dentro do intervalo do predicado de uma execução futura.
Comportamento refresh completa
Uma refresh completa de um fluxo REPLACE WHERE reexecuta a consulta de origem usando apenas o predicado atual. As linhas inseridas por meio de substituições de predicados ou instruções DML fora do intervalo de predicados atual são excluídas permanentemente.
Uma refresh completa apaga todos os dados existentes e reexecuta o fluxo usando apenas o predicado definido. Se um pipeline estiver em execução há um ano com um predicado de 7 dias, uma refresh completa resultará em uma tabela contendo apenas os dados dos últimos 7 dias. Todas as linhas mais antigas serão excluídas permanentemente.
Para evitar a atualização completa de uma tabela, defina a propriedade da tabela pipelines.reset.allowed para false. Consulte a referência de propriedades do pipeline.
refreshincremental
Os fluxos REPLACE WHERE usam refresh incremental sempre que possível, reprocessando apenas os dados de origem que foram alterados desde a última refresh , em vez de recalcular toda a janela de substituição. refresh incremental requer compute serverless .
Quando refresh incremental se aplica
Todas as afirmações a seguir devem ser verdadeiras:
- A execução pipeline na compute serverless .
- O formato da consulta é suportado. Consulte a seção refreshincremental para obter informações sobre o conjunto de operadores compatíveis.
- O predicado faz referência a colunas base de uma tabela de origem. Predicados em valores derivados, como saídas de funções agregadas ou de janela, não podem ser enviados para uma fonte, o que desativa refresh incremental.
- Nenhuma operação DML externa modificou linhas na janela de substituição atual. Instruções DML que modificam linhas fora da janela atual não são afetadas.
- A janela de substituição atual não inclui linhas que o predicado anterior excluiu. Se você ampliar o predicado para abranger um intervalo não processado anteriormente, essa refresh resultará em um recálculo completo. As atualizações subsequentes são elegíveis para refresh incremental novamente.
- O predicado é determinístico. Predicados que utilizam funções não determinísticas, como
rand()desativam refresh incremental. Funções temporais comocurrent_date()são permitidas.
A primeira refresh de qualquer fluxo é sempre um cálculo completo. Caso alguma condição não seja atendida, a refresh recorre ao recálculo completo da janela de substituição atual.
Melhores práticas para refreshincremental
Siga estas diretrizes para que os fluxos REPLACE WHERE permaneçam elegíveis para refresh incremental.
Use um limite inferior móvel
Os predicados com um limite inferior móvel permanecem elegíveis para refresh incremental indefinidamente.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Um limite superior móvel, como date BETWEEN date_add(current_date(), -7) AND current_date(), pode deslocar a janela para incluir linhas previamente excluídas, acionando um fallback único para recálculo completo.
Inclua a coluna de predicado no GROUP BY.
Ao agregar, inclua a coluna de predicado em GROUP BY para que o mecanismo possa empurrar o predicado para baixo da agregação.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Se a coluna de predicado estiver ausente de GROUP BY, o predicado não pode ser colocado abaixo da agregação e a fonte é verificada por completo.
Inclua a coluna de predicado na chave join .
Inclua a coluna de predicado na condição join para que o mecanismo possa eliminar todas as fontes unidas.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Se uma tabela resultante de uma junção não expuser a coluna de predicado, essa tabela será verificada por completo a cada refresh.
Diagnosticar a fallback de recálculo completo.
Quando uma refresh recorre a um recálculo completo, o motivo é relatado no evento planning_information para o fluxo. Consulte a seção "Monitorar logs de eventos do pipeline". A tabela a seguir lista os motivos relatados no evento:
Razão | Significado |
|---|---|
| Uma operação DML externa modificou as linhas na janela de substituição atual. |
| O predicado utiliza expressões não determinísticas. |
| A refresh anterior utilizou um predicado não determinístico. |
| O predicado não pode ser enviado para nenhuma fonte, a janela atual inclui linhas não processadas pelo predicado anterior ou a execução usa uma substituição de predicado. |
Limitações
Os fluxos REPLACE WHERE têm as seguintes limitações:
- A tabela de destino deve ser criada dentro do pipeline.
- É permitido apenas um fluxo REPLACE WHERE por tabela de destino.
- Uma tabela que é alvo de um fluxo REPLACE WHERE não pode ser alvo de outro tipo de fluxo, como um fluxo AUTO CDC ou um fluxo de acréscimo.
- As expectativas não são suportadas em tabelas alvo de fluxos REPLACE WHERE.
- Para tabelas de transmissão criadas no Databricks SQL, consulte Fluxos REPLACE WHERE para tabelas de transmissão independentes para obter informações sobre diferenças de sintaxe e preenchimento retroativo.
Exemplos
Os exemplos a seguir mostram padrões de fluxo comuns do comando REPLACE WHERE.
Exemplo 1: Manter agregados históricos de uma fonte de retenção limitada
Este exemplo mantém os agregados diários indefinidamente, mesmo depois que os dados brutos expiram na tabela de origem (retenção de 3 dias):
- SQL
- Python
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Exemplo 2: Impedir o recálculo quando uma tabela de dimensões for alterada.
Este exemplo mantém as linhas de fatos históricos inalteradas quando os atributos da dimensão são alterados:
- SQL
- Python
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Se a região de um usuário for alterada, apenas as linhas recentes serão recalculadas. As linhas históricas conservam o valor da região no momento em que foram escritas. Para corrigir linhas históricas, execute um preenchimento reverso direcionado usando substituições de predicado.
Exemplo 3: Adicionar novas métricas sem recalcular a história completa
Este exemplo mostra como evoluir a definição de uma tabela e preencher retroativamente apenas um intervalo específico:
- Defina a tabela inicial:
- SQL
- Python
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks
FROM clickstream_raw
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def clickstream_daily():
return (
spark.read.table("clickstream_raw")
.groupBy("event_date", "page_id")
.agg(F.count("*").alias("clicks"))
)
- Atualize a consulta para adicionar
uniq_users:
- SQL
- Python
CREATE STREAMING TABLE clickstream_daily
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
page_id,
COUNT(*) AS clicks,
COUNT(DISTINCT user_id) AS uniq_users
FROM clickstream_raw
GROUP BY ALL;
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def clickstream_daily():
return (
spark.read.table("clickstream_raw")
.groupBy("event_date", "page_id")
.agg(
F.count("*").alias("clicks"),
F.countDistinct("user_id").alias("uniq_users"),
)
)
-
Preencha os novos dados dos últimos 30 dias:
Pythonoverrides = [
{
"flow_name": "clickstream_daily",
"predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["clickstream_daily"],
)As linhas mais antigas que o intervalo preenchido contêm
NULLparauniq_users.
Exemplo 4: Itere em uma pequena janela antes de preencher o histórico completo.
Este exemplo mostra como validar a lógica de consulta em uma pequena janela de dados antes de processar todo o intervalo histórico.
Comece com um intervalo curto para que cada refresh recalcule apenas os últimos 7 dias enquanto você revisa a consulta:
- SQL
- Python
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Após a finalização da consulta, utilize uma substituição de predicado para realizar um preenchimento histórico único:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)