Using Auto Loader in Delta Live Tables

You can use Auto Loader in your Delta Live Tables pipelines. Delta Live Tables extends functionality in Apache Spark Structured Streaming and allows you to write just a few lines of declarative Python or SQL to deploy a production-quality data pipeline with:

You do not need to provide a schema or checkpoint location because Delta Live Tables automatically manages these settings for your pipelines. See Delta Live Tables data sources.

Auto Loader syntax for DLT

Delta Live Tables provides slightly modified Python syntax for Auto Loader, and adds SQL support for Auto Loader.

The following examples use Auto Loader to create datasets from CSV and JSON files:

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

You can use supported format options with Auto Loader. Using the map() function, you can pass any number of options to the cloud_files() method. Options are key-value pairs, where the keys and values are strings. The following describes the syntax for working with Auto Loader in SQL:

CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
  FROM cloud_files(
    "<file_path>",
    "<file_format>",
    map(
      "<option_key>", "<option_value",
      "<option_key>", "<option_value",
      ...
    )
  )

The following example reads data from tab-delimited CSV files with a header:

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

You can use the schema to specify the format manually; you must specify the schema for formats that do not support schema inference:

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Note

Delta Live Tables automatically configures and manages the schema and checkpoint directories when using Auto Loader to read files. However, if you manually configure either of these directories, performing a full refresh does not affect the contents of the configured directories. Databricks recommends using the automatically configured directories to avoid unexpected side effects during processing.