Melhores práticas para o pipeline declarativo LakeFlow Spark
Esta página descreve os padrões recomendados para projetar, construir e operar pipelines com o pipeline declarativo LakeFlow Spark . Aplique estas diretrizes ao iniciar um novo pipeline ou aprimorar um já existente.
Escolha o tipo de dataset correto
O pipeline declarativo LakeFlow Spark oferece três tipos de dataset : tabelas de transmissão, visão materializada e visão temporária. Escolher o tipo certo para cada camada do seu pipeline evita custos compute desnecessários e mantém seu código fácil de entender.
tabelas de transmissão são a escolha certa para ingestão de dados e transmissões de baixa latência. Cada linha de entrada é lida e processada apenas uma vez, o que as torna ideais para cargas de trabalho de acréscimo, dados de alto volume e processamento orientado a eventos provenientes de armazenamento cloud ou barramentos de mensagens.
As visões materializadas são a escolha certa para transformações complexas e consultas analíticas. Os resultados são pré-computados e mantidos atualizados por meio de refresh incremental, portanto, as consultas a eles são rápidas. Não é possível modificar diretamente os dados em uma view materializada — a definição da consulta controla a saída.
As visualizações temporárias são visualizações com escopo pipelineque organizam a lógica de suas transformações sem materializar quaisquer dados no armazenamento. Use-os para passos intermediários que não precisam de uma tabela própria.
A tabela a seguir resume quando usar cada tipo:
Caso de uso | Tipo recomendado | Razão |
|---|---|---|
Ingestão a partir de armazenamento cloud ou barramento de mensagens | Tabela de transmissão | Processa cada registro uma única vez; lida com alto volume de trabalho e cargas de trabalho somente de acréscimo. |
Transmissão CDC (inserções, atualizações, exclusões) | Tabela de transmissão | Usado como alvo de |
Agregações complexas e junções | Visualização materializada | Atualização incremental; evita o recálculo completo a cada atualização. |
Aceleração de consultas no painel de controle | Visualização materializada | Resultados pré-computados tornam as consultas mais rápidas do que em tabelas brutas. |
Transformações intermediárias (sem leitores downstream) | viewtemporária | Organiza a lógica do pipeline sem incorrer em custos de armazenamento. |
Para obter mais informações, consulte tabelas de transmissão, visualização materializada e conceitos de pipeline declarativoLakeFlow Spark.
Use CDC declarativo em vez de merge imperativo
A implementação da captura de dados de alterações (CDC) (CDC) com instruções SQL imperativas MERGE requer um código personalizado significativo para lidar corretamente com a ordenação de eventos, desduplicação, atualizações parciais e evolução do esquema. Cada uma dessas questões deve ser resolvida de forma independente, e o código resultante é difícil de manter e testar.
O pipeline declarativo LakeFlow Spark fornece a instrução APPLY CHANGES INTO (SQL) e a função apply_changes() (Python), que lidam com ordenação, deduplicação, eventos fora de ordem e evolução do esquema de forma declarativa. Você descreve o formato do feed de alterações e a tabela de destino — o pipeline cuida do resto. APPLY CHANGES INTO suporta tanto SCD Tipo 1 (sobrescrita) quanto SCD Tipo 2 (preservação do histórico).
Para mais informações, consulte O que é captura de dados de alterações (CDC) (CDC)? e APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline.
Garantir a qualidade dos dados com base nas expectativas.
Expectativas são expressões SQL de verdadeiro/falso aplicadas a cada linha que passa por um dataset. Quando uma linha não atende à condição, o pipeline responde de acordo com a política de violação que você configurou. As expectativas emitem métricas para o log de eventos do pipeline, independentemente da política, permitindo acompanhar as tendências de qualidade dos dados ao longo do tempo.
Escolha uma política de violação
Três políticas de violação estão disponíveis. Escolha aquela que melhor se adequa à sua tolerância a dados incorretos:
- aviso (default): Registros inválidos são gravados na tabela de destino e sinalizados nas métricas. Utilize esta política quando precisar coletar todos os dados, mas desejar ter visibilidade sobre problemas de qualidade.
- drop : Registros inválidos são descartados antes da gravação. Use esta opção quando linhas com erros forem esperadas e não devem ser propagadas para os fluxos subsequentes.
- falha : A atualização do pipeline é interrompida no primeiro registro inválido. Use isso para dados críticos, onde qualquer registro incorreto indica um problema grave a montante.
Os exemplos a seguir mostram cada política aplicada a uma mesa de transmissão:
- SQL
- Python
-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");
-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);
-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);
from pyspark import pipelines as dp
# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/volumes/raw/orders")
# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
return spark.readStream.table("orders_raw")
# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
return spark.readStream.table("orders_clean")
Registros inválidos em quarentena
Quando você deseja preservar registros perdidos para investigação, em vez de descartá-los silenciosamente, use um padrão de quarentena. Direcione as linhas que não passarem na validação para uma tabela de transmissão separada usando dois fluxos: um que descarta as linhas inválidas da tabela principal e um segundo que grava apenas as linhas inválidas em uma tabela de quarentena. Isso permite investigar, corrigir e reprocessar dados incorretos sem contaminar seu dataset limpo.
Para um exemplo detalhado do padrão de quarentena, consulte Recomendações de expectativa e padrões avançados.
Para obter mais informações sobre expectativas, consulte Gerenciar a qualidade dos dados com as expectativas pipeline.
Parametrize seu pipeline
O pipeline possui configurações default de catálogo e esquema, portanto, o código que lê e grava dentro do mesmo catálogo e esquema funciona em diferentes ambientes sem a necessidade de parâmetros. No entanto, se o seu pipeline precisar referenciar um segundo catálogo ou esquema — por exemplo, lendo de um catálogo de origem compartilhado que difere entre desenvolvimento e produção — evite codificar esses nomes diretamente no seu código-fonte. Em vez disso, defina-os como parâmetros de configuração pipeline (par key-valor definido nas configurações pipeline ) e faça referência a eles em seu código. Isso permite que uma única base de código seja executada corretamente em diferentes ambientes, trocando os valores dos parâmetros.
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;
from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum
@dp.materialized_view
def transaction_summary():
source_catalog = spark.conf.get("source_catalog")
return spark.read.table(f"{source_catalog}.sales.transactions") \
.groupBy("account_id") \
.agg(
count("txn_id").alias("txn_count"),
sum("amount").alias("total_amount")
)
Para obter mais informações, consulte Usar parâmetros com pipeline.
Escolha o modo de pipeline adequado para cada ambiente.
Modos de atualização de desenvolvimento e produção
Execução do pipeline em modo de atualização de desenvolvimento ou produção . Escolha o modo que melhor se adequa ao seu objetivo.
No modo de desenvolvimento , o pipeline reutiliza uma execução declusterlongo em todas as atualizações e não tenta novamente em caso de erros. Isso acelera o ciclo de iteração ao criar e testar o código do pipeline, pois você obtém detalhes do erro imediatamente, sem precisar esperar a reinicialização do cluster.
Em modo de produção , o cluster é desligado imediatamente após a conclusão de cada atualização, o que reduz os custos compute . O pipeline também aplica tentativas de reinicialização progressivas, incluindo reinicializações de cluster, para lidar automaticamente com falhas transitórias de infraestrutura. Utilize o modo de produção para todas as execuções pipeline agendadas.
Modo de pipeline acionado versus contínuo
O modo acionado processa todos os dados disponíveis e, em seguida, para. É a escolha certa para a grande maioria dos pipelines: aqueles que são executados de forma programada (por hora, diariamente ou sob demanda) e não exigem atualização de dados em menos de um minuto.
O modo contínuo mantém o cluster em execução e processa novos dados à medida que chegam. É apropriado apenas quando o seu caso de uso exige uma latência na faixa de segundos a minutos. Como o modo contínuo exige um cluster sempre ativo, ele é significativamente mais caro do que o modo acionado por gatilho.
Para obter mais informações, consulte Modo pipeline acionado vs. contínuo e Configurar pipeline.
Use clustering líquido para disponibilidade de dados
clustering líquido substitui o particionamento estático e ZORDER para otimizar a disposição de dados em tabelas Delta . Ao contrário do particionamento, que exige a escolha prévia de uma coluna de partição e pode causar distorção de dados quando os valores são distribuídos de forma desigual, o clustering líquido é autoajustável, resistente à distorção e incremental — apenas os dados que precisam ser reorganizados são reescritos a cada execução.
Altere as colunas clustering a qualquer momento sem precisar reescrever a tabela inteira, conforme os padrões de consulta evoluem.
Defina as colunas clustering na definição da sua tabela:
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");
from pyspark import pipelines as dp
@dp.table(cluster_by=["event_date", "region"])
def events():
return spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("/volumes/raw/events")
Se você não tiver certeza de quais colunas usar para cluster , use CLUSTER BY AUTO para permitir que Databricks selecione automaticamente as colunas clustering ideais com base na sua carga de trabalho de consultas.
Para mais informações, veja tabelas de transmissão e Use clustering líquido para tabelas.
gerenciando pipeline com CI/CD e Databricks ativo Bundles
Controle as versões do código-fonte do seu pipeline e use Databricks Ativo Bundles para gerenciar implantações em diferentes ambientes.
Para obter mais informações, consulte Criar um pipelinecom controle de versão, Converter um pipeline em um projeto Databricks Ativo Bundle e Usar parâmetros com pipeline.
Armazene o código do pipeline no controle de versão.
Armazene todos os arquivos de origem do pipeline (Python e SQL) juntamente com a configuração do seu pacote em um repositório Git. Controlar as versões de todo o projeto fornece um histórico completo das alterações, facilita a colaboração e permite validar as alterações em um ambiente de desenvolvimento antes de promovê-las para produção.
Databricks recomenda Databricks ativo Bundles para gerenciar esse fluxo de trabalho. Um pacote define a configuração do seu pipeline em YAML junto com seu código fonte, e a CLI databricks bundle permite validar, implantar e executar o pipeline a partir do seu terminal ou de um sistema CI/CD .
Use alvos de pacote para isolamento de ambiente.
Os pacotes permitem múltiplos destinos (por exemplo, dev, staging, prod), cada um com seu próprio conjunto de substituições para nomes de catálogo, política de cluster, endereços de notificação e outras configurações. Combine os destinos do pacote com os parâmetros pipeline para injetar os valores corretos específicos do ambiente no momento da implantação, mantendo seu código-fonte livre de constantes de ambiente.
Um fluxo de trabalho típico se parece com isto:
- Um desenvolvedor trabalha em um branch de recurso, implantado em um pipeline de desenvolvimento pessoal em um catálogo de desenvolvimento.
- Na merge com o branch principal, um sistema CI executa
databricks bundle validateedatabricks bundle deploy --target stagingpara validar e implantar o pipeline em um ambiente de teste. - Após a aprovação nos testes, o sistema CI implanta em produção com
databricks bundle deploy --target prod.
melhores práticas
Utilize esses padrões para gerenciar o estado, controlar dados atrasados e manter o pipeline de transmissão confiável.
Para obter mais informações, consulte Otimizar o processamento com estado usando marcas d'água, Recuperar um pipeline de uma falha de ponto de verificação de transmissão e Preenchimento retroativo de dados históricos com pipeline.
Use marcas d'água para operações com estado.
As marcas d'água delimitam o estado que o pipeline mantém na memória durante operações de transmissão com estado, como agregações em janelas e desduplicação. Sem uma marca d'água, o estado cresce indefinidamente à medida que o pipeline acumula dados para cada key possível, eventualmente causando erros de falta de memória em pipelines de longa duração.
Uma marca d'água especifica uma coluna de registro de data e hora e um limite de tolerância para dados atrasados. Registros que chegam após o limite ter sido ultrapassado são descartados. Escolha um limite que equilibre sua tolerância a dados atrasados com o custo de memória de manter esse estado aberto.
O exemplo a seguir calcula uma agregação de janela deslizante de um minuto com uma marca d'água de três minutos:
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;
from pyspark import pipelines as dp
from pyspark.sql.functions import window
@dp.table
def event_counts():
return (
spark.readStream.table("events_raw")
.withWatermark("event_time", "3 minutes")
.groupBy(window("event_time", "1 minute"), "region")
.count()
)
Para garantir que as agregações sejam processadas incrementalmente, em vez de serem totalmente recalculadas a cada atualização, você deve definir uma marca d'água.
Entenda o estado da transmissão e refreshcompleta
O estado da transmissão é incremental: o pipeline constrói e mantém o estado ao longo das atualizações, em vez de recalcular do zero a cada vez. É isso que torna a transmissão com estado eficiente, mas também significa que, se você alterar a lógica de uma consulta com estado (por exemplo, modificando um limite de marca d'água ou alterando colunas de agregação), o estado existente deixará de ser compatível com a nova lógica. Nesse caso, você deve realizar uma refresh completa para reprocessar todos os dados históricos com a nova lógica e reconstruir o estado do zero.
Uma refresh completa também pode levar à perda de dados se a fonte não reter o histórico de dados. Por exemplo, uma fonte Kafka com um curto período de retenção pode ter apenas os últimos minutos de dados disponíveis no momento da refresh, resultando em uma tabela que contém muito menos dados do que antes. Planeje cuidadosamente as alterações na lógica de consultas com estado, especialmente para transmissões de alto volume, onde uma refresh completa é dispendiosa ou onde a fonte tem retenção de dados limitada. Utilizar a arquitetura de medalhão ajuda a criar tabelas de bronze com transformações mínimas e permite que tabelas de prata ou ouro sejam recalculadas a partir das tabelas de bronze com a história completa.
transmissão-transmissão join
A junção transmissão-transmissão requer uma marca d'água em ambos os lados da join e uma condição join com limite de tempo. O intervalo de tempo na condição join informa ao mecanismo de transmissão quando não são mais possíveis correspondências, permitindo que ele remova estados que não podem mais ser correspondidos. Se você omitir as marcas d'água ou a condição de tempo limite, o estado cresce indefinidamente.
O exemplo a seguir combina eventos de impressão de anúncios com eventos de clique, exigindo que o clique ocorra em até três minutos após a impressão:
- SQL
- Python
CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
AND clk.click_time BETWEEN imp.impression_time
AND imp.impression_time + INTERVAL 3 MINUTES;
from pyspark import pipelines as dp
from pyspark.sql.functions import expr
dp.create_streaming_table("impression_clicks")
@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
impressions = spark.readStream.table("ad_impressions") \
.withWatermark("impression_time", "3 minutes")
clicks = spark.readStream.table("user_clicks") \
.withWatermark("click_time", "3 minutes")
return impressions.alias("imp").join(
clicks.alias("clk"),
expr("""
imp.ad_id = clk.ad_id AND
clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
"""),
"leftOuter"
)
Ao realizar uma join de transmissão com uma tabela estática (uma join de Snapshot), a tabela estática Snapshot é atualizada no início de cada microlote. Isso significa que os registros de dimensões que chegam com atraso não são aplicados retroativamente a fatos que já foram processados. Caso seja necessária a aplicação retroativa, utilize uma view materializada ou reestruture o pipeline.
Otimizar o desempenho pipeline
Aplique essas técnicas para reduzir os custos compute e acelerar as atualizações pipeline .
Para obter mais informações, consulte Visualização materializada e Otimizar o processamento com estado usando marcas d'água.
Evite arquivos pequenos
Acionar um pipeline com muita frequência em uma fonte de baixo volume grava um grande número de arquivos pequenos no armazenamento cloud . Arquivos pequenos degradam o desempenho de leitura porque cada arquivo requer uma pesquisa de metadados e uma operação de entrada/saída separadas, e APIs de armazenamento cloud limitam as operações de listagem em escala. Para evitar isso, escolha um intervalo de disparo que corresponda ao seu volume de dados: execute um pipeline acionado por um programador que permita que uma quantidade significativa de dados se acumule entre as atualizações, em vez de continuamente.
Lidar com distorção de dados
A distorção de dados ocorre quando os valores em uma key join ou agrupamento são distribuídos de forma desigual entre as partições, fazendo com que um pequeno número de tarefas processe a maior parte dos dados. Isso cria pontos de acesso intenso que aumentam o tempo de atualização de ponta a ponta. Utilize clustering líquido para corrigir a distorção em tabelas armazenadas. Para corrigir distorções que ocorrem durante o processamento em tempo real, aplique um sufixo aleatório às chaves com alta distorção antes de agrupá-las e agregá-las em duas etapas.
Para mais informações, consulte Usar clustering líquido para disposição de dados.
Use refresh incremental para visualização materializada.
Ao usar uma view materializada para uma grande agregação, o pipeline declarativo do LakeFlow Spark tenta refresh la incrementalmente — processando apenas as alterações a montante desde a última atualização, em vez de recalcular todo o conjunto de resultados. refresh incremental é significativamente mais barata do que executar a consulta do zero a cada acionamento pipeline . Para maximizar a probabilidade de uma view materializada ser atualizada incrementalmente, escreva consultas de agregação simples e determinísticas e evite construções que impeçam o processamento incremental, como funções não determinísticas.
Consulte refresh incremental para visualização materializada.
Otimizar a adesão
Para junções em que um dos lados é uma tabela de dimensão pequena, adicione uma dica de broadcast para instruir Spark a transmitir a tabela menor para todos os executores em vez de realizar uma junção por embaralhamento (shuffle join:
- SQL
- Python
CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast
@dp.materialized_view
def enriched_orders():
orders = spark.read.table("orders")
products = spark.read.table("products")
return orders.join(broadcast(products), "product_id")
Para junção por proximidade de séries temporais (por exemplo, encontrar o evento mais próximo dentro de um intervalo de tempo), use uma condição join por intervalo e certifique-se de que ambos os lados tenham uma marca d'água ao realizar a junção de transmissões, ou considere agrupar os eventos em intervalos de tempo antes da junção.
Monitore seu oleoduto
O log de eventos pipeline é o principal elemento primitivo de observabilidade no pipeline declarativo LakeFlow Spark . Cada execução pipeline grava registros estruturados no log de eventos, abrangendo o progresso da execução, os resultados esperados da qualidade dos dados, a linhagem de dados e os detalhes dos erros. O log de eventos é uma tabela Delta que você pode consultar diretamente.
Para consultar o log de eventos sem conhecer o caminho de armazenamento subjacente, use a função com valor de tabela event_log() em um cluster compartilhado ou SQL warehouse:
SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;
Crie painéis de controle de qualidade de dados consultando o log de eventos para obter as métricas esperadas. A coluna details contém uma estrutura JSON aninhada com contagens de aprovação/reprovação para cada restrição, que você pode usar para acompanhar as tendências de qualidade ao longo do tempo e receber alertas sobre regressões.
Para alertas orientados a eventos, use ganchos de eventos para acionar webhooks personalizados ou serviços de notificação (como Slack ou PagerDuty) quando um pipeline falhar ou quando um limite de qualidade de dados for ultrapassado. Os ganchos de evento são funções Python que são executadas em resposta a eventos pipeline .
Para obter mais informações, consulte Monitorar pipeline, logde eventos do pipeline e Definir monitoramento personalizado do pipeline com ganchos de eventos.
Utilize compute serverless
Databricks recomenda compute serverless para novos pipelines. Com a arquitetura serverless, não há configuração manual cluster — Databricks gerencia a infraestrutura automaticamente. O pipeline sem servidor utiliza escalonamento automático aprimorado que pode aumentar tanto horizontalmente (mais executores) quanto verticalmente (tamanho maior executor ) em resposta às demandas de carga de trabalho. O pipeline sem servidor sempre usa Unity Catalog, portanto, a governança e o acompanhamento de linhagem estão integrados por default.
Para obter mais informações, consulte Configurar um pipeline serverless.
Organize o pipeline com a arquitetura Medallion.
A arquitetura em medalhão organiza os dados em três camadas lógicas — bronze, prata e ouro — cada uma com uma finalidade específica. Mapear os tipos de dataset do pipeline declarativo do LakeFlow Spark para a camada correta mantém as responsabilidades de cada camada claras e facilita a manutenção do pipeline.
- Bronze : Use tabelas de transmissão para ingerir dados brutos de armazenamento cloud , barramentos de mensagens ou fontes CDC . As tabelas de bronze preservam os dados brutos da fonte com transformações mínimas, possibilitando que as camadas de prata ou ouro reprocessem os dados a partir da fonte na camada de bronze caso os requisitos mudem.
- Prata : Use tabelas de transmissão para transformações incrementais em nível de linha (filtragem, limpeza e análise). Utilize a visualização materializada quando a lógica da camada prateada envolver junções de enriquecimento com tabelas de dimensão ou agregações complexas que se beneficiam de refresh incrementais.
- ouro : Use a visão materializada para pré-compute agregações, métricas e resumos que são servidos a painéis, ferramentas de relatório e consumidores subsequentes.
Separe a ingestão (bronze) e as transformações (silver e ouro) em DAGs pipeline distintos sempre que possível. O desacoplamento das camadas permite programar, monitorar e solucionar problemas de cada camada de forma independente, e uma falha em um pipeline de transformações não impede que novos dados cheguem ao ambiente de bronze.
Para mais informações consulte tabelas de transmissão e visualização materializada.