Skip to main content

Load data with DLT

You can load data from any data source supported by Apache Spark on Databricks using DLT. You can define datasets (tables and views) in DLT against any query that returns a Spark DataFrame, including streaming DataFrames and Pandas for Spark DataFrames. For data ingestion tasks, Databricks recommends using streaming tables for most use cases. Streaming tables are good for ingesting data from cloud object storage using Auto Loader or from message buses like Kafka.

note

The examples below demonstrate some common patterns.

Load from an existing table

Load data from any existing table in Databricks. You can transform the data using a query, or load the table for further processing in your pipeline.

The following example reads data from an existing table:

Python
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)

Load files from cloud object storage

Databricks recommends using Auto Loader with DLT for most data ingestion tasks from cloud object storage or from files in a Unity Catalog volume. Auto Loader and DLT are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage.

See What is Auto Loader? and Load data from object storage.

The following example reads data from cloud storage using Auto Loader:

Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("gs://mybucket/analysis/*/*/*.json")
)

The following examples use Auto Loader to create datasets from CSV files in a Unity Catalog volume:

Python
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
note
  • If you use Auto Loader with file notifications and run a full refresh for your pipeline or streaming table, you must manually clean up your resources. You can use the CloudFilesResourceManager in a notebook to perform cleanup.
  • To load files with Auto Loader in a Unity Catalog enabled pipeline, you must use external locations. To learn more about using Unity Catalog with DLT, see Use Unity Catalog with your DLT pipelines.

Load data from a message bus

You can configure DLT pipelines to ingest data from message buses. Databricks recommends using streaming tables with continuous execution and enhanced autoscaling to provide the most efficient ingestion for low-latency loading from message buses. See Optimize the cluster utilization of DLT pipelines with Autoscaling.

For example, the following code configures a streaming table to ingest data from Kafka, using the read_kafka function:

Python
import dlt

@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)

To ingest from other message bus sources, see:

Load data from Azure Event Hubs

Azure Event Hubs is a data streaming service that provides an Apache Kafka compatible interface. You can use the Structured Streaming Kafka connector, included in the DLT runtime, to load messages from Azure Event Hubs. To learn more about loading and processing messages from Azure Event Hubs, see Use Azure Event Hubs as a DLT data source.

Load data from external systems

DLT supports loading data from any data source supported by Databricks. See Connect to data sources. You can also load external data using Lakehouse Federation for supported data sources. Because Lakehouse Federation requires Databricks Runtime 13.3 LTS or above, to use Lakehouse Federation your pipeline must be configured to use the preview channel.

Some data sources do not have equivalent support in SQL. If you cannot use Lakehouse Federation with one of these data sources, you can use a Python notebook to ingest data from the source. You can add Python and SQL source code to the same DLT pipeline. The following example declares a materialized view to access the current state of data in a remote PostgreSQL table:

Python
import dlt

@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)

Load small or static datasets from cloud object storage

You can load small or static datasets using Apache Spark load syntax. DLT supports all of the file formats supported by Apache Spark on Databricks. For a full list, see Data format options.

The following examples demonstrate loading JSON to create DLT tables:

Python
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
note

The read_files SQL function is common to all SQL environments on Databricks. It is the recommended pattern for direct file access using SQL with DLT. For more information, see Options.

Configure a streaming table to ignore changes in a source streaming table

note
  • The skipChangeCommits flag works only with spark.readStream using the option() function. You cannot use this flag in a dlt.read_stream() function.
  • You cannot use the skipChangeCommits flag when the source streaming table is defined as the target of an apply_changes() function.

By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits flag can be set when reading the source streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.

Python
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

Securely access storage credentials with secrets in a pipeline

You can use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration. See Configure compute for a DLT pipeline.

The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage (ADLS) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.

To learn more about working with Azure Data Lake Storage, see Connect to Azure Data Lake Storage and Blob Storage.

note

You must add the spark.hadoop. prefix to the spark_conf configuration key that sets the secret value.

JSON
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}

Replace

  • <storage-account-name> with the ADLS storage account name.
  • <scope-name> with the Databricks secret scope name.
  • <secret-name> with the name of the key containing the Azure storage account access key.
Python
import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)

Replace

  • <container-name> with the name of the Azure storage account container that stores the input data.
  • <storage-account-name> with the ADLS storage account name.
  • <path-to-input-dataset> with the path to the input dataset.