Pular para o conteúdo principal

Preenchimento de dados históricos com LakeFlow Declarative pipeline

Na engenharia de dados, backfilling refere-se ao processo de processamento retroativo de dados históricos por meio de um site pipeline de dados que foi projetado para processar dados atuais ou de transmissão.

Normalmente, esse é um fluxo separado que envia dados para suas tabelas existentes. A ilustração a seguir mostra um fluxo de backfill enviando dados históricos para as tabelas de bronze em seu site pipeline.

Fluxo de backfill adicionando dados históricos a um fluxo de trabalho existente

Alguns cenários que podem exigir um preenchimento:

  • Processe dados históricos de um sistema legado para treinar um modelo de aprendizado de máquina (ML) ou criar um painel de análise de tendências históricas.
  • Reprocessar um subconjunto de dados devido a um problema de qualidade de dados com a fonte de dados upstream.
  • Suas necessidades comerciais mudaram e o senhor precisa preencher novamente os dados de um período diferente que não foi coberto pelo pipeline inicial.
  • Sua lógica de negócios mudou e você precisa reprocessar dados históricos e atuais.

Um backfill em LakeFlow O pipeline declarativo é compatível com um fluxo de acréscimo especializado que usa a opção ONCE. Consulte append_flow ou CREATE FLOW (LakeFlow Declarative pipeline) para obter mais informações sobre a opção ONCE.

Considerações sobre o backfilling de dados históricos em uma tabela de transmissão

  • Normalmente, o senhor anexa os dados à tabela de transmissão de bronze. As camadas de prata e ouro a jusante captarão os novos dados da camada de bronze.
  • Assegure-se de que seu pipeline possa lidar com dados duplicados de forma elegante, caso os mesmos dados sejam anexados várias vezes.
  • Assegurar que o esquema de dados históricos seja compatível com o esquema de dados atual.
  • Considere o tamanho do volume de dados e o tempo de processamento necessário SLA, e configure adequadamente os tamanhos de clustering e lotes.

Exemplo: Adição de um aterro a uma área existente pipeline

Neste exemplo, digamos que o senhor tenha um pipeline que ingere dados brutos de registro de eventos de uma fonte de armazenamento em nuvem, a partir de 1º de janeiro de 2025. Mais tarde, o senhor percebe que deseja preencher os três anos anteriores do histórico de dados para relatórios downstream e casos de uso de análise. Todos os dados estão em um único local, divididos por ano, mês e dia, no formato JSON.

Inicial pipeline

Aqui está o código inicial do pipeline que ingere de forma incremental os dados brutos de registro de eventos do armazenamento em nuvem.

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dlt.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
)

Aqui, usamos a opção modifiedAfter Auto Loader para garantir que não estamos processando todos os dados do caminho de armazenamento em nuvem. O processamento incremental é cortado nesse limite.

dica

Outras fontes de dados, como Kafka, Kinesis e Azure Event Hubs, têm opções de leitura equivalentes para obter o mesmo comportamento.

Preencher dados dos últimos 3 anos

Agora você quer adicionar um ou mais fluxos para preencher os dados anteriores. Neste exemplo, siga as seguintes etapas:

  • Use o fluxo append once. Isso executa um backfill único sem continuar a execução após esse primeiro backfill. O código permanece em seu site pipeline e, se o site pipeline for totalmente atualizado, o backfill será reexecutado.
  • Crie três fluxos de preenchimento, um para cada ano (nesse caso, os dados são divididos por ano no caminho). No Python, parametrizamos a criação dos fluxos, mas no SQL repetimos o código três vezes, uma vez para cada fluxo.

Se o senhor estiver trabalhando em seu próprio projeto e não estiver usando o serverless compute, talvez queira atualizar o max worker para o pipeline. Aumentar o max worker garante que o senhor tenha o recurso para processar os dados históricos enquanto continua a processar os dados da transmissão atual dentro do esperado SLA.

dica

Se o senhor usar serverless compute com autoscale aprimorado (o default), o clustering aumentará automaticamente de tamanho quando a carga aumentar.

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
backfill_path = f"{source_root_path}/year={year}/*/*"
@dlt.append_flow(
target="registration_events_raw",
once=True,
name=f"flow_registration_events_raw_backfill_{year}",
comment=f"Backfill {year} Raw registration events")
def backfill():
return (
spark
.read
.format("json")
.option("inferSchema", "true")
.load(backfill_path)
)

# create the streaming table
dlt.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dlt.append_flow(
target="registration_events_raw",
name="flow_registration_events_raw_incremental",
comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}")
)

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
setup_backfill_flow(year) # call the previously defined append_flow for each year

Essa implementação destaca vários padrões importantes.

Separação de interesses

  • O processamento incremental é independente das operações de backfill.
  • Cada fluxo tem suas próprias configurações e configurações de otimização.
  • Há uma clara distinção entre operações incrementais e de preenchimento.

Execução controlada

  • O uso da opção ONCE garante que cada backfill seja executado exatamente uma vez.
  • O fluxo de preenchimento permanece no site pipeline gráfico, mas torna-se parado depois de concluído. Ele está pronto para ser usado em refresh, automaticamente.
  • Há uma trilha de auditoria clara das operações de backfill na definição do site pipeline.

Otimização de processamento

  • Você pode dividir o preenchimento grande em vários preenchimentos menores para acelerar o processamento ou controlar o processamento.
  • O uso da escala automática aprimorada dimensiona dinamicamente o tamanho do clustering com base na carga de clustering atual.

evolução do esquema

  • O uso de schemaEvolutionMode="addNewColumns" lida com as mudanças de esquema normalmente.
  • Você tem uma inferência de esquema consistente em dados históricos e atuais.
  • Há um tratamento seguro de novas colunas em dados mais recentes.

Recurso adicional