Tutorial: Build an ETL pipeline using change data capture with DLT
Learn how to create and deploy an ETL (extract, transform, and load) pipeline with change data capture (CDC) using DLT for data orchestration and Auto Loader. An ETL pipeline implements the steps to read data from source systems, transform that data based on requirements, such as data quality checks and record de-duplication, and write the data to a target system, such as a data warehouse or a data lake.
In this tutorial, you'll use data from a customers
table in a MySQL database to:
- Extract the changes from a transactional database using Debezium or any other tool and save them in a cloud object storage (S3 folder, ADLS, GCS). You will skip setting up an external CDC system to simplify the tutorial.
- Use Auto Loader to incrementally load the messages from cloud object storage, and store the raw messages in the
customers_cdc
table. Auto Loader will infer the schema and handle schema evolution. - Add a view
customers_cdc_clean
to check the data quality using expectations. For example, theid
should never benull
as you will use it to run your upsert operations. - Perform the
APPLY CHANGES INTO
(doing the upserts) on the cleaned CDC data to apply the changes to the finalcustomers
table - Show how DLT can create a type 2 slowly changing dimension (SCD2) to keep track of all the changes.
The goal is to ingest the raw data in near real time and build a table for your analyst team while ensuring data quality.
The tutorial uses the medallion Lakehouse architecture, where it ingests raw data through the bronze layer, cleans and validates data with the silver layer, and applies dimensional modeling and aggregation using the gold layer. See What is the medallion lakehouse architecture? for more information.
The flow you will implement looks like this:
For more information about DLT, Auto Loader, and CDC see DLT, What is Auto Loader?, and What is change data capture (CDC)?
Requirements
To complete this tutorial, you must meet the following requirements:
- Be logged into a Databricks workspace.
- Have Unity Catalog enabled for your workspace.
- Have serverless compute enabled for your account. Serverless DLT pipelines is not available in all workspace regions. See Features with limited regional availability for available regions.
- Have permission to create a compute resource or access to a compute resource.
- Have permissions to create a new schema in a catalog. The required permissions are
ALL PRIVILEGES
orUSE CATALOG
andCREATE SCHEMA
. - Have permissions to create a new volume in an existing schema. The required permissions are
ALL PRIVILEGES
orUSE SCHEMA
andCREATE VOLUME
.
Change data capture in an ETL pipeline
Change data capture (CDC) is the process that captures the changes in records made to a transactional database (for example, MySQL or PostgreSQL) or a Data Warehouse. CDC captures operations like data deletion, append, and updating, typically as a stream to re-materialize the table in external systems. CDC enables incremental loading while eliminating the need for bulk load updating.
To simplify the tutorial, skip setting up an external CDC system. You can consider it up and running and saving the CDC data as JSON files in a blob storage (S3, ADLS, GCS).
Capturing CDC
A variety of CDC tools are available. One of the open source leader solutions is Debezium, but other implementations that simplify the data source exist, such as Fivetran, Qlik Replicate, Streamset, Talend, Oracle GoldenGate, and AWS DMS.
In this tutorial, you use CDC data from an external system like Debezium or DMS. Debezium captures every changed row. It typically sends the history of data changes to Kafka logs or saves them as a file.
You must ingest the CDC information from the customers
table (JSON format), check that it is correct, and then materialize the customer table in the Lakehouse.
CDC input from Debezium
For each change, you will receive a JSON message containing all the fields of the row being updated (id
, firstname
, lastname
, email
, address
). In addition, you will have extra metadata information, including:
operation
: An operation code, typically (DELETE
,APPEND
,UPDATE
).operation_date
: The date and timestamp for the record for each operation action.
Tools like Debezium can produce more advanced output, such as the row value before the change, but this tutorial omits them for simplicity.
Step 0: Setup of tutorial data
First, you must create a new notebook and install the demo files used in this tutorial into your workspace.
-
Click New in upper-left corner.
-
Click Notebook.
-
Change the title of the notebook from Untitled Notebook <date and time> to DLT tutorial setup.
-
Next to your notebook's title at the top, set the notebook's default language to Python.
-
To generate the dataset used in the tutorial, enter the following code in the first cell and type Shift + Enter to run the code:
Python# You can change the catalog, schema, dbName, and db. If you do so, you must also
# change the names in the rest of the tutorial.
catalog = "main"
schema = dbName = db = "dbdemos_dlt_cdc"
volume_name = "raw_data"
spark.sql(f'CREATE CATALOG IF NOT EXISTS `{catalog}`')
spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume_name}`')
volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"
try:
dbutils.fs.ls(volume_folder+"/customers")
except:
print(f"folder doesn't exists, generating the data under {volume_folder}...")
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
fake = Faker()
import random
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
df = spark.range(0, 100000).repartition(100)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df_customers = df.withColumn("operation_date", fake_date())
df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers") -
To preview the data used in this tutorial, enter the code in the next cell and type Shift + Enter to run the code:
Pythondisplay(spark.read.json("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers"))
Step 1: Create a pipeline
First, you will create an ETL pipeline in DLT. DLT creates pipelines by resolving dependencies defined in notebooks or files (called source code) using DLT syntax. Each source code file can contain only one language, but you can add multiple language-specific notebooks or files in the pipeline. To learn more, see DLT
Leave the Source code field blank to automatically create and configure a notebook for source code authoring.
This tutorial uses serverless compute and Unity Catalog. For all configuration options that are not specified, use the default settings. If serverless compute is not enabled or supported in your workspace, you can complete the tutorial as written using default compute settings. If you use default compute settings, you must manually select Unity Catalog under Storage options in the Destination section of the Create pipeline UI.
To create a new ETL pipeline in DLT, follow these steps:
- In the sidebar, click Pipelines.
- Click Create pipeline and ETL pipeline.
- In Pipeline name, type a unique pipeline name.
- Select the Serverless checkbox.
- Select Triggered in Pipeline mode. This will run the streaming flows using the AvailableNow trigger, which processes all existing data and then shuts down the stream.
- In Destination, to configure a Unity Catalog location where tables are published, select an existing Catalog and write a new name in Schema to create a new schema in your catalog.
- Click Create.
The pipeline UI appears for the new pipeline.
A blank source code notebook is automatically created and configured for the pipeline. The notebook is created in a new directory in your user directory. The name of the new directory and file match the name of your pipeline. For example, /Users/someone@example.com/my_pipeline/my_pipeline
.
- A link to access this notebook is under the Source code field in the Pipeline details panel. Click the link to open the notebook before proceeding to the next step.
- Click Connect in the upper-right to open the compute configuration menu.
- Hover over the name of the pipeline you created in Step 1.
- Click Connect.
- Next to your notebook's title at the top, select the notebook's default language (Python or SQL).
Notebooks can only contain a single programming language. Do not mix Python and SQL code in pipeline source code notebooks.
When developing a DLT pipeline, you can choose either Python or SQL. This tutorial includes examples for both languages. Based on your language choice, check that you select the default notebook language.
To learn more about notebook support for DLT pipeline code development, see Develop and debug ETL pipelines with a notebook in DLT.
Step 2: Incrementally ingest data with Auto Loader
The first step is to ingest the raw data from the cloud storage into a bronze layer.
This can be challenging for multiple reasons, as you must:
- Operate at scale, potentially ingesting millions of small files.
- Infer schema and JSON type.
- Handle bad records with incorrect JSON schema.
- Take care of schema evolution (for example, a new column in the customer table).
Auto Loader simplify this ingestion, including schema inference and schema evolution, while scaling to millions of incoming files. Auto Loader is available in Python using cloudFiles
and in SQL using the SELECT * FROM STREAM read_files(...)
and can be used with a variety of formats (JSON, CSV, Apache Avro, etc.):
Defining the table as a streaming table will guarantee that you only consume new incoming data. If you do not define it as a streaming table, you will scan and ingest all the available data. See Streaming tables for more information.
-
To ingest the incoming data using Auto Loader, copy and paste the following code into the first cell in the notebook. You can use Python or SQL, depending on the notebook's default language you chose in the previous step.
- Python
- SQL
Pythonfrom dlt import *
from pyspark.sql.functions import *
# Create the target bronze table
dlt.create_streaming_table("customers_cdc_bronze", comment="New customer data incrementally ingested from cloud object storage landing zone")
# Create an Append Flow to ingest the raw data into the bronze table
@append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/main/dbdemos_dlt_cdc/raw_data/customers")
)SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_bronze
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_bronze_ingest_flow AS
INSERT INTO customers_cdc_bronze BY NAME
SELECT *
FROM STREAM read_files(
"/Volumes/main/dbdemos_dlt_cdc/raw_data/customers",
format => "json",
inferColumnTypes => "true"
) -
Click Start to start an update for the connected pipeline.
Step 3: Cleanup and expectations to track data quality
After the bronze layer is defined, you will create the silver layers by adding expectations to control the data quality by checking the following conditions:
- ID must never be
null
. - The CDC operation type must be valid.
- The
json
must have been read adequately by Auto Loader.
The row will be dropped if one of these conditions isn't respected.
See Manage data quality with pipeline expectations for more information.
-
Click Edit and Insert cell below to insert a new empty cell.
-
To create a silver layer with a cleansed table and impose constraints, copy and paste the following code into the new cell in the notebook.
- Python
- SQL
Pythondlt.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)
@append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
dlt.read_stream("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
)SQLCREATE OR REFRESH STREAMING TABLE customers_cdc_clean (
CONSTRAINT no_rescued_data EXPECT (_rescued_data IS NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_operation EXPECT (operation IN ('APPEND', 'DELETE', 'UPDATE')) ON VIOLATION DROP ROW
)
COMMENT "New customer data incrementally ingested from cloud object storage landing zone";
CREATE FLOW customers_cdc_clean_flow AS
INSERT INTO customers_cdc_clean BY NAME
SELECT * FROM STREAM customers_cdc_bronze; -
Click Start to start an update for the connected pipeline.
Step 4: Materializing the customers table with apply changes
The customers
table will contain the most up-to-date view and be a replica of the original table.
This is nontrivial to implement manually. You must consider things like data deduplication to keep the most recent row.
However, DLT solves these challenges with the apply changes operation.
-
Click Edit and Insert cell below to insert a new empty cell.
-
To process the CDC data using apply changes in DLT, copy and paste the following code into the new cell in the notebook.
- Python
- SQL
Pythondlt.create_streaming_table(name="customers", comment="Clean, materialized customers")
dlt.apply_changes(
target="customers", # The customer table being materialized
source="customers_cdc_clean", # the incoming CDC
keys=["id"], # what we'll be using to match the rows to upsert
sequence_by=col("operation_date"), # de-duplicate by operation date, getting the most recent value
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE condition
except_column_list=["operation", "operation_date", "_rescued_data"],
)SQLCREATE OR REFRESH STREAMING TABLE customers;
APPLY CHANGES INTO customers
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 1; -
Click Start to start an update for the connected pipeline.
Step 5: Slowly Changing Dimension of type 2 (SCD2)
It's often required to create a table tracking all the changes resulting from APPEND
, UPDATE
, and DELETE
:
- History: You want to keep a history of all the changes to your table.
- Traceability: You want to see which operation occurred.
SCD2 with DLT
Delta supports change data flow (CDF), and table_change
can query the table modification in SQL and Python. However, CDF's main use case is to capture changes in a pipeline and not create a full view of the table changes from the beginning.
Things get especially complex to implement if you have out-of-order events. If you must sequence your changes by a timestamp and receive a modification that happened in the past, then you must append a new entry in your SCD table and update the previous entries.
DLT removes this complexity and lets you create a separate table containing all the modifications from the beginning of time. This table can then be used at scale, with specific partitions / zorder columns if required. Out of order fields will be handled out of the box based on the _sequence_by
To create an SCD2 table, we must use the APPLY CHANGES
with the extra option: STORED AS SCD TYPE 2
in SQL or stored_as_scd_type="2"
in Python.
You can also limit which columns the feature tracks using the option: TRACK HISTORY ON {columnList | EXCEPT(exceptColumnList)}
-
Click Edit and Insert cell below to insert a new empty cell.
-
Copy and paste the following code into the new cell in the notebook.
- Python
- SQL
Python# create the table
dlt.create_streaming_table(
name="customers_history", comment="Slowly Changing Dimension Type 2 for customers"
)
# store all changes as SCD2
dlt.apply_changes(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # Enable SCD2 and store individual updatesSQLCREATE OR REFRESH STREAMING TABLE customers_history;
APPLY CHANGES INTO customers_history
FROM stream(customers_cdc_clean)
KEYS (id)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY operation_date
COLUMNS * EXCEPT (operation, operation_date, _rescued_data)
STORED AS SCD TYPE 2; -
Click Start to start an update for the connected pipeline.
Step 6: Create a materialized view that tracks who has changed their information the most
The table customers_history
contains all historical changes a user has made to their information. You will now create a simple materialized view in the gold layer that keeps track of who has changed their information the most. This could be used for fraud detection analysis or user recommendations in a real-world scenario. Additionally, applying changes with SCD2 has already removed duplicates for us, so we can directly count the rows per user ID.
-
Click Edit and Insert cell below to insert a new empty cell.
-
Copy and paste the following code into the new cell in the notebook.
- Python
- SQL
Python@dlt.table(
name = "customers_history_agg",
comment = "Aggregated customer history"
)
def customers_history_agg():
return (
dlt.read("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
)SQLCREATE OR REPLACE MATERIALIZED VIEW customers_history_agg AS
SELECT
id,
count("address") as address_count,
count("email") AS email_count,
count("firstname") AS firstname_count,
count("lastname") AS lastname_count
FROM customers_history
GROUP BY id -
Click Start to start an update for the connected pipeline.
Step 7: Create a job to run the DLT pipeline
Next, create a workflow to automate data ingestion, processing, and analysis steps using a Databricks job.
- In your workspace, click
Workflows in the sidebar and click Create job.
- In the task title box, replace New Job <date and time> with your job name. For example,
CDC customers workflow
. - In Task name, enter a name for the first task, for example,
ETL_customers_data
. - In Type, select Pipeline.
- In Pipeline, select the DLT pipeline you created in step 1.
- Click Create.
- To run the workflow, click Run Now. To view the details for the run, click the Runs tab. Click the task to view details for the task run.
- To view the results when the workflow is completed, click Go to the latest successful run or the Start time for the job run. The Output page appears and displays the query results.
See Monitoring and observability for Databricks Jobs for more information about job runs.
Step 8: Schedule the DLT pipeline job
To run the ETL pipeline on a schedule, follow these steps:
- Click
Workflows in the sidebar.
- In the Name column, click the job name. The side panel appears as the Job details.
- Click Add trigger in the Schedules & Triggers panel and select Scheduled in Trigger type.
- Specify the period, starting time, and time zone.
- Click Save.