Using Auto Loader in Delta Live Tables
You can use Auto Loader in your Delta Live Tables pipelines. Delta Live Tables extends functionality in Apache Spark Structured Streaming and allows you to write just a few lines of declarative Python or SQL to deploy a production-quality data pipeline with:
Autoscaling compute infrastructure for cost savings
Data quality checks with expectations
Automatic schema evolution handling
Monitoring via metrics in the event log
You do not need to provide a schema or checkpoint location because Delta Live Tables automatically manages these settings for your pipelines. See Load data with Delta Live Tables.
Auto Loader syntax for DLT
Delta Live Tables provides slightly modified Python syntax for Auto Loader and adds SQL support for Auto Loader.
The following examples use Auto Loader to create datasets from CSV and JSON files:
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
You can use supported format options with Auto Loader. Using the map()
function, you can pass options to the cloud_files()
method. Options are key-value pairs, where the keys and values are strings. The following describes the syntax for working with Auto Loader in SQL:
CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
FROM cloud_files(
"<file-path>",
"<file-format>",
map(
"<option-key>", "<option_value",
"<option-key>", "<option_value",
...
)
)
The following example reads data from tab-delimited CSV files with a header:
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))
You can use the schema
to specify the format manually; you must specify the schema
for formats that do not support schema inference:
@dlt.table
def wiki_raw():
return (
spark.readStream.format("cloudFiles")
.schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
.option("cloudFiles.format", "parquet")
.load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
)
CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
FROM cloud_files(
"/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
"parquet",
map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
)
Note
Delta Live Tables automatically configures and manages the schema and checkpoint directories when using Auto Loader to read files. However, if you manually configure either of these directories, performing a full refresh does not affect the contents of the configured directories. Databricks recommends using the automatically configured directories to avoid unexpected side effects during processing.