Ingest data into Delta Live Tables

You can use the following external data sources to create datasets:

  • Any data source that Databricks Runtime directly supports.

  • Any file in cloud storage such as Azure Data Lake Storage Gen2 (ADLS Gen2), AWS S3, or Google Cloud Storage (GCS).

  • Any file stored in DBFS.

Databricks recommends using Auto Loader for pipelines that read data from supported file formats, particularly for streaming live tables that operate on continually arriving data. Auto Loader is scalable, efficient, and supports schema inference.

Python datasets can use the Apache Spark built-in file data sources to read data in a batch operation from file formats not supported by Auto Loader.

SQL datasets can use Delta Live Tables file sources to read data in a batch operation from file formats not supported by Auto Loader.

Auto Loader

The following examples use Auto Loader to create datasets from CSV and JSON files:

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

You can use supported format options with Auto Loader. Using the map() function, you can pass any number of options to the cloud_files() method. Options are key-value pairs, where the keys and values are strings. The following describes the syntax for working with Auto Loader in SQL:

CREATE OR REFRESH STREAMING LIVE TABLE <table_name>
AS SELECT *
  FROM cloud_files(
    "<file_path>",
    "<file_format>",
    map(
      "<option_key>", "<option_value",
      "<option_key>", "<option_value",
      ...
    )
  )

The following example reads data from tab-delimited CSV files with a header:

CREATE OR REFRESH STREAMING LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

You can use the schema to specify the format manually; you must specify the schema for formats that do not support schema inference:

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )
CREATE OR REFRESH STREAMING LIVE TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

Note

Delta Live Tables automatically configures and manages the schema and checkpoint directories when using Auto Loader to read files. However, if you manually configure either of these directories, performing a full refresh does not affect the contents of the configured directories. Databricks recommends using the automatically configured directories to avoid unexpected side effects during processing.

Apache Spark file sources

To read files in a batch operation when defining datasets in Python, you can use standard PySpark functions. The following example reads Parquet data from files using the PySpark spark.read.format("parquet").load() function:

@dlt.table
def lendingclub_raw_data():
  return (
    spark.read.format("parquet").load("/databricks-datasets/samples/lending_club/parquet/")
  )

Spark SQL file sources

To read files in a batch operation when defining datasets in SQL, you can use Spark SQL syntax. The following example reads Parquet data from files:

CREATE OR REFRESH LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`