Load data with DLT
You can load data from any data source supported by Apache Spark on Databricks using DLT. You can define datasets (tables and views) in DLT against any query that returns a Spark DataFrame, including streaming DataFrames and Pandas for Spark DataFrames. For data ingestion tasks, Databricks recommends using streaming tables for most use cases. Streaming tables are good for ingesting data from cloud object storage using Auto Loader or from message buses like Kafka.
- Not all data sources have SQL support. You can mix SQL and Python notebooks in a DLT pipeline to use SQL for all operations beyond ingestion.
- For details on working with libraries not packaged in DLT by default, see Manage Python dependencies for DLT pipelines.
- For general information about ingestion in Databricks, see Ingest data into a Databricks lakehouse.
The examples below demonstrate some common patterns.
Load from an existing table
Load data from any existing table in Databricks. You can transform the data using a query, or load the table for further processing in your pipeline.
The following example reads data from an existing table:
- Python
- SQL
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
Load files from cloud object storage
Databricks recommends using Auto Loader with DLT for most data ingestion tasks from cloud object storage or from files in a Unity Catalog volume. Auto Loader and DLT are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage.
See What is Auto Loader? and Load data from object storage.
The following example reads data from cloud storage using Auto Loader:
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("gs://mybucket/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
'gs://mybucket/analysis/*/*/*.json',
format => "json"
);
The following examples use Auto Loader to create datasets from CSV files in a Unity Catalog volume:
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
- If you use Auto Loader with file notifications and run a full refresh for your pipeline or streaming table, you must manually clean up your resources. You can use the CloudFilesResourceManager in a notebook to perform cleanup.
- To load files with Auto Loader in a Unity Catalog enabled pipeline, you must use external locations. To learn more about using Unity Catalog with DLT, see Use Unity Catalog with your DLT pipelines.
Load data from a message bus
You can configure DLT pipelines to ingest data from message buses. Databricks recommends using streaming tables with continuous execution and enhanced autoscaling to provide the most efficient ingestion for low-latency loading from message buses. See Optimize the cluster utilization of DLT pipelines with Autoscaling.
For example, the following code configures a streaming table to ingest data from Kafka, using the read_kafka function:
- Python
- SQL
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
To ingest from other message bus sources, see:
- Kinesis: read_kinesis
- Pub/Sub topic: read_pubsub
- Pulsar: read_pulsar
Load data from Azure Event Hubs
Azure Event Hubs is a data streaming service that provides an Apache Kafka compatible interface. You can use the Structured Streaming Kafka connector, included in the DLT runtime, to load messages from Azure Event Hubs. To learn more about loading and processing messages from Azure Event Hubs, see Use Azure Event Hubs as a DLT data source.
Load data from external systems
DLT supports loading data from any data source supported by Databricks. See Connect to data sources. You can also load external data using Lakehouse Federation for supported data sources. Because Lakehouse Federation requires Databricks Runtime 13.3 LTS or above, to use Lakehouse Federation your pipeline must be configured to use the preview channel.
Some data sources do not have equivalent support in SQL. If you cannot use Lakehouse Federation with one of these data sources, you can use a Python notebook to ingest data from the source. You can add Python and SQL source code to the same DLT pipeline. The following example declares a materialized view to access the current state of data in a remote PostgreSQL table:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Load small or static datasets from cloud object storage
You can load small or static datasets using Apache Spark load syntax. DLT supports all of the file formats supported by Apache Spark on Databricks. For a full list, see Data format options.
The following examples demonstrate loading JSON to create DLT tables:
- Python
- SQL
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
The read_files
SQL function is common to all SQL environments on Databricks. It is the recommended pattern for direct file access using SQL with DLT. For more information, see Options.
Configure a streaming table to ignore changes in a source streaming table
- The
skipChangeCommits
flag works only withspark.readStream
using theoption()
function. You cannot use this flag in adlt.read_stream()
function. - You cannot use the
skipChangeCommits
flag when the source streaming table is defined as the target of an apply_changes() function.
By default, streaming tables require append-only sources. When a streaming table uses another streaming table as a source, and the source streaming table requires updates or deletes, for example, GDPR “right to be forgotten” processing, the skipChangeCommits
flag can be set when reading the source streaming table to ignore those changes. For more information about this flag, see Ignore updates and deletes.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
Securely access storage credentials with secrets in a pipeline
You can use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration. See Configure compute for a DLT pipeline.
The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage (ADLS) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.
To learn more about working with Azure Data Lake Storage, see Connect to Azure Data Lake Storage and Blob Storage.
You must add the spark.hadoop.
prefix to the spark_conf
configuration key that sets the secret value.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
<storage-account-name>
with the ADLS storage account name.<scope-name>
with the Databricks secret scope name.<secret-name>
with the name of the key containing the Azure storage account access key.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
<container-name>
with the name of the Azure storage account container that stores the input data.<storage-account-name>
with the ADLS storage account name.<path-to-input-dataset>
with the path to the input dataset.