Delta Live Tables quickstart

Preview

This feature is in Public Preview. Contact your Databricks representative to request access.

You can easily create and run a Delta Live Tables pipeline using a Databricks notebook. This article demonstrates using a Delta Live Tables pipeline on a dataset containing Wikipedia clickstream data to:

  • Read the raw JSON clickstream data into a table.
  • Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.
  • Use the records from the cleansed data table to make Delta Live Tables queries that create derived datasets.

In this quickstart, you will:

  1. Create a new notebook and add the code to implement the pipeline.
  2. Create a new pipeline job using the notebook.
  3. Start an update of the pipeline job.
  4. View results of the pipeline job.

Requirements

You must have cluster creation permissions to start a pipeline. The Delta Live Tables runtime creates a cluster before it runs your pipeline and fails if you don’t have the correct permissions.

Create a notebook

You can use an example notebook or create a new notebook to run the Delta Live Tables pipeline:

  1. Go to your Databricks landing page and select Create Blank Notebook.

  2. In the Create Notebook dialogue, give your notebook a name and select Python or SQL from the Default Language dropdown menu. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline.

  3. Click Create.

  4. Copy the Python or SQL code example and paste it into your new notebook. You can add the example code to a single cell of the notebook or multiple cells.

    Note

    You must start your pipeline from the Delta Live Tables tab of the Jobs user interface. Clicking Run Icon to run your pipeline will return an error.

Code Example

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *


json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.create_table(
  comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
  table_properties={
    "quality": "bronze"
  }
)
def clickstream_raw():
  return (
    spark.read.json(json_path)
  )


@dlt.create_table(
  comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
  table_properties={
    "quality": "silver"
  }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
  return (
    dlt.read("clickstream_raw")
      .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
  )


@dlt.create_table(
  comment="A table of the most common pages that link to the Apache Spark page.",
  table_properties={
    "quality": "gold"
  }
)
def top_spark_referrers():
  return (
    dlt.read("clickstream_clean")
      .filter(expr("current_page_title == 'Apache_Spark'"))
      .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("click_count"))
      .select("referrer", "click_count")
      .limit(10)
  )


@dlt.create_table(
  comment="A list of the top 50 pages by number of clicks.",
  table_properties={
    "quality": "gold"
  }
)
def top_pages():
  return (
    dlt.read("clickstream_clean")
      .groupBy("current_page_title")
      .agg(sum("click_count").alias("total_clicks"))
      .sort(desc("total_clicks"))
      .limit(50)
  )
CREATE LIVE TABLE clickstream_raw
COMMENT "The raw wikipedia click stream dataset, ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "bronze")
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`

CREATE LIVE TABLE clickstream_clean(
  CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL),
  CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations."
TBLPROPERTIES ("quality" = "silver")
AS SELECT
  CAST (curr_id AS INT) AS current_page_id,
  curr_title AS current_page_title,
  CAST(n AS INT) AS click_count,
  CAST (prev_id AS INT) AS previous_page_id,
  prev_title AS previous_page_title
FROM live.clickstream_raw

CREATE LIVE TABLE top_spark_referers
COMMENT "A table of the most common pages that link to the Apache Spark page."
TBLPROPERTIES ("quality" = "gold")
AS SELECT
  previous_page_title as referrer,
  click_count
FROM live.clickstream_clean
WHERE current_page_title = 'Apache_Spark'
ORDER BY click_count DESC
LIMIT 10

CREATE LIVE TABLE top_pages
COMMENT "A list of the top 50 pages by number of clicks."
TBLPROPERTIES ("quality" = "gold")
AS SELECT
  current_page_title,
  SUM(click_count) as total_clicks
FROM live.clickstream_clean
GROUP BY current_page_title
ORDER BY 2 DESC
LIMIT 50

Create a pipeline

To create a new pipeline using the Delta Live Tables notebook:

  1. Click Jobs Icon Jobs in the sidebar, click the Pipelines tab, and click Create Pipeline.
  2. Give the pipeline a name and enter the full path to the notebook.
  3. Optionally enter a storage location for output data from the pipeline. The system uses a default location if you leave Storage Location empty.
  4. Select Triggered for Pipeline Mode.
  5. Click Create.
Create pipeline

The system displays the Pipeline Details page after you click Create. You can also access your pipeline by clicking the pipeline name in the Pipelines tab.

Start the pipeline

To start an update for the new pipeline, click the Delta Live Tables Start Icon button in the top panel. The system returns a message confirming that your pipeline is starting.

Start pipeline

After successfully starting the update, the Delta Live Tables system:

  1. Starts a cluster using a cluster configuration created by the Delta Live Tables system. You can also specify a custom cluster configuration.
  2. Creates any tables that don’t exist and ensures that the schema is correct for any existing tables.
  3. Updates tables with the latest data available.
  4. Shuts down the cluster when the update is complete.

You can track the progress of the update by viewing the event log at the bottom of the Pipeline Details page.

View pipeline event log

View results

You can use the Delta Live Tables user interface to view pipeline processing details. This includes a visual view of the pipeline graph and schemas, and record processing details such as the number of records processed and records that fail validation.

View the pipeline graph

To view the processing graph for your pipeline, click the Graph tab. You can use your mouse to adjust the view or the Delta Live Tables Graph Buttons Icon buttons in the upper right corner of the graph panel.

View pipeline graph

View dataset information

Click a dataset to view schema information for the dataset.

View pipeline schema

View processing details

To view processing details, including the number of records processed and data quality metrics, select an entry in the event log and click the JSON tab.

View event log details

View pipeline settings

Click the Settings tab to view the generated configuration for your pipeline. Click the Edit Settings button to modify the pipeline configuration. See Delta Live Tables settings for details on configuration settings.

Publish datasets

You can make pipeline output data available for querying by publishing tables to the Databricks metastore:

  1. Click the Edit Settings button.

  2. Add the target setting to configure a database name for your tables.

    Configure database name
  3. Click Save

  4. Click the Delta Live Tables Start Icon button to start a new update for your pipeline.

After the update completes, you can view the database and tables, query the data, or use the data in downstream applications.

Query wikipedia data

Example notebooks

These notebooks provide Python and SQL examples that implement a Delta Live Tables pipeline to:

  • Read raw JSON clickstream data into a table.
  • Read the records from the raw data table and use Delta Live Tables expectations to create a new table that contains cleansed data.
  • Use the records from the cleansed data table to make Delta Live Tables queries that create derived datasets.

Get started with Delta Live Tables Python notebook

Open notebook in new tab

Get started with Delta Live Tables SQL notebook

Open notebook in new tab