Skip to main content

Backfilling historical data with Lakeflow Declarative Pipelines

In data engineering, backfilling refers to the process of retroactively processing historical data through a data pipeline that was designed for processing current or streaming data.

Typically, this is a separate flow sending data into your existing tables. The following illustration shows a backfill flow sending historical data to the bronze tables in your pipeline.

Backfill flow adding historical data to an existing workflow

Some scenarios that might require a backfill:

  • Process historical data from a legacy system to train a machine learning (ML) model or build a historical trend analysis dashboard.
  • Reprocess a subset of data due to a data quality issue with upstream data sources.
  • Your business requirements changed and you need to backfill data for a different time period that was not covered by the initial pipeline.
  • Your business logic changed and you need to reprocess both historical and current data.

A backfill in Lakeflow Declarative Pipelines is supported with a specialized append flow that uses the ONCE option. See append_flow or CREATE FLOW (Lakeflow Declarative Pipelines) for more information about the ONCE option.

Considerations when backfilling historical data into a streaming table

  • Typically, append the data to the bronze streaming table. Downstream silver and gold layers will pick up the new data from the bronze layer.
  • Ensure your pipeline can handle duplicate data gracefully in case the same data is appended multiple times.
  • Ensure the historical data schema is compatible with the current data schema.
  • Consider the data volume size and the required processing time SLA, and accordingly configure the cluster and batch sizes.

Example: Adding a backfill to an existing pipeline

In this example, say you have a pipeline that ingests raw event registration data from a cloud storage source, starting Jan 01, 2025. You later realize that you want to backfill the previous three years of historical data for downstream reporting and analysis uses cases. All data is in one location, partitioned by year, month, and day, in JSON format.

Initial pipeline

Here's the starting pipeline code that incrementally ingests the raw event registration data from the cloud storage.

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
incremental_load_path = f"{source_root_path}/*/*/*"

# create a streaming table and the default flow to ingest streaming events
@dlt.table(name="registration_events_raw", comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2025-01-01T00:00:00.000+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}") # safeguard to not process data before begin_year
)

Here we use the modifiedAfter Auto Loader option to ensure we are not processing all data from the cloud storage path. The incremental processing is cutoff at that boundary.

tip

Other data sources, such as Kafka, Kinesis, and Azure Event Hubs have equivalent reader options to achieve the same behavior.

Backfill data from previous 3 years

Now you want to add one or more flows to backfill previous data. In this example, take the following steps:

  • Use the append once flow. This performs a one-time backfill without continuing to run after that first backfill. The code remains in your pipeline, and if the pipeline is ever fully refreshed, the backfill is re-run.
  • Create three backfill flows, one for each year (in this case, the data is split by year in the path). For Python, we parameterize the creation of the flows, but in SQL we repeat the code three times, once for each flow.

If you are working on your own project and not using serverless compute, you may want to update the max workers for the pipeline. Increasing the max workers ensures you have the resources to process the historical data while continuing to process the current streaming data within the expected SLA.

tip

If you use serverless compute with enhanced autoscaling (the default), then your cluster automatically increases in size when your load increases.

Python
import dlt

source_root_path = spark.conf.get("registration_events_source_root_path")
begin_year = spark.conf.get("begin_year")
backfill_years = spark.conf.get("backfill_years") # e.g. "2024,2023,2022"
incremental_load_path = f"{source_root_path}/*/*/*"

# meta programming to create append once flow for a given year (called later)
def setup_backfill_flow(year):
backfill_path = f"{source_root_path}/year={year}/*/*"
@dlt.append_flow(
target="registration_events_raw",
once=True,
name=f"flow_registration_events_raw_backfill_{year}",
comment=f"Backfill {year} Raw registration events")
def backfill():
return (
spark
.read
.format("json")
.option("inferSchema", "true")
.load(backfill_path)
)

# create the streaming table
dlt.create_streaming_table(name="registration_events_raw", comment="Raw registration events")

# append the original incremental, streaming flow
@dlt.append_flow(
target="registration_events_raw",
name="flow_registration_events_raw_incremental",
comment="Raw registration events")
def ingest():
return (
spark
.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.maxFilesPerTrigger", 100)
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("modifiedAfter", "2024-12-31T23:59:59.999+00:00")
.load(incremental_load_path)
.where(f"year(timestamp) >= {begin_year}")
)

# parallelize one time multi years backfill for faster processing
# split backfill_years into array
for year in backfill_years.split(","):
setup_backfill_flow(year) # call the previously defined append_flow for each year

This implementation highlights several important patterns.

Separation of concerns

  • Incremental processing is independent of backfill operations.
  • Each flow has its own configuration and optimization settings.
  • There is a clear distinction between incremental and backfill operations.

Controlled execution

  • Using the ONCE option ensures that each backfill runs exactly once.
  • The backfill flow remains in the pipeline graph, but becomes idle after it's complete. It is ready for use on full refresh, automatically.
  • There is a clear audit trail of backfill operations in the pipeline definition.

Processing optimization

  • You can split the large backfill into multiple smaller backfills for faster processing, or for control of the processing.
  • Using enhanced autoscaling dynamically scales the cluster size based on the current cluster load.

Schema evolution

  • Using schemaEvolutionMode="addNewColumns" handles schema changes gracefully.
  • You have consistent schema inference across historical and current data.
  • There is safe handling of new columns in newer data.

Additional resources