Run your first end-to-end analytics pipeline in the Databricks Lakehouse

Databricks provides a suite of production-ready tools that allow data professionals to quickly develop and deploy extract, transform, and load (ETL) pipelines. Unity Catalog allows data stewards to configure and secure storage credentials, external locations, and database objects for users throughout an organization. Databricks SQL allows analysts to run SQL queries against the same tables used in production ETL workloads, allowing for real time business intelligence at scale.

By the end of this article, you will feel comfortable:

  1. Launching a Unity Catalog enabled compute cluster.

  2. Creating a Databricks notebook.

  3. Writing and reading data from a Unity Catalog external location.

  4. Configuring incremental data ingestion to a Unity Catalog table with Auto Loader.

  5. Executing notebook cells to process, query, and preview data.

  6. Scheduling a notebook as a Databricks job.

  7. Querying Unity Catalog tables from Databricks SQL

This tutorial uses interactive notebooks to complete common ETL tasks in Python on Unity Catalog enabled clusters. If you are not using Unity Catalog, see Run your first ETL workload on Databricks.

Requirements

  • You are logged into Databricks, and you’re in the Data Science & Engineering workspace. For more information, see Sign up for a free trial.

Note

If you do not have cluster control privileges, you can still complete most of the steps below as long as you have access to a cluster.

If you only have access to the Databricks SQL workspace, see: Get started with Databricks as a data analyst.

Step 1: Create a cluster

To do exploratory data analysis and data engineering, create a cluster to provide the compute resources needed to execute commands.

  1. Click compute icon Compute in the sidebar.

  2. On the Compute page, click Create Cluster. This opens the New Cluster page.

  3. Specify a unique name for the cluster.

  4. Select the Single node radio button.

  5. Select Single user from the Access mode dropdown.

  6. Make sure your email address is visible in the Single user access field.

  7. Select the desired Databricks runtime version, 11.1 or above to use Unity Catalog.

  8. Click Create Cluster.

To learn more about Databricks clusters, see Clusters.

Step 2: Create a Databricks notebook

To get started writing and executing interactive code on Databricks, create a notebook.

  1. Click Create Icon Create in the sidebar, then click Notebook.

  2. On the Create Notebook page:

    • Specify a unique name for your notebook.

    • Make sure the default language is set to Python.

    • Select the cluster you created in step 1 from the Cluster dropdown.

    • Click Create.

A notebook opens with an empty cell at the top.

To learn more about creating and managing notebooks, see Manage notebooks.

Step 3: Write and read data from an external location managed by Unity Catalog

Databricks recommends using Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.

You can use Unity Catalog to manage secure access to external locations. Users or service principals with READ FILES permissions on an external location can use Auto Loader to ingest data.

Normally, data will arrive in an external location due to writes from other systems. In this demo, you can simulate data arrival by writing out JSON files to an external location.

Copy the code below into a notebook cell. Replace the string value for catalog with the name of a catalog with CREATE and USAGE permissions. Replace the string value for external_location with the path for an external location with READ FILES, WRITE FILES, and CREATE TABLE permissions.

External locations can be defined as an entire storage container, but often point to a directory nested in a container.

The correct format for an external location path is "s3://bucket-name/path/to/external_location".


 external_location = "<your_external_location>"
 catalog = "<your_catalog>"

 dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/foobar.txt"))
 dbutils.fs.rm(f"{external_location}/foobar.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Executing this cell should print a line that 12 bytes were written, print the string “Hello world!”, and display all the databases present in the catalog provided. If you are unable to get this cell to run, confirm that you are in a Unity Catalog enabled workspace and request proper permissions from your workspace administrator to complete this tutorial.

The Python code below uses your email address to create a unique database in the catalog provided and a unique storage location in external location provided. Executing this cell will remove all data associated with this tutorial, allowing you to execute this example idempotently. A class is defined and instantiated that you will use to simulate batches of data arriving from a conncted system to your source external location.

Copy this code to a new cell in your notebook and execute it to configure your environment.

Note

The variables defined in this code should allow you to safely execute it without risk of conflicting with existing workspace assets or other users. Restricted network or storage permissions will raise errors when executing this code; contact your workspace administrator to troubleshoot these restrictions.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)


# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

You can now land a batch of data by copy the following code into a cell and executing it. You can manually execute this cell up to 60 times to trigger new data arrival.

RawData.land_batch()

Step 4: Configure Auto Loader to ingest data to Unity Catalog

Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.

To configure Auto Loader to ingest data to a Unity Catalog table, copy and paste the following code into an empty cell in your notebook:

# Import functions
from pyspark.sql.functions import input_file_name, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", input_file_name().alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

To learn more about Auto Loader, see What is Auto Loader?.

To learn more about Structured Streaming with Unity Catalog, see Using Unity Catalog with Structured Streaming.

Step 5: Process and interact with data

Notebooks execute logic cell-by-cell. Use these steps to execute the logic in your cell:

  1. To run the cell you completed in the previous step, select the cell and press SHIFT+ENTER.

  2. To query the table you’ve just created, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    df = spark.read.table(table_name)
    
  3. To preview the data in your DataFrame, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    display(df)
    

To learn more about interactive options for visualizing data, see Visualizations.

Step 6: Schedule a job

You can run Databricks notebooks as production scripts by adding them as a task in a Databricks job. In this step, you will create a new job that you can trigger manually.

To schedule your notebook as a task:

  1. Click Schedule on the right side of the header bar.

  2. Enter a unique name for the Job name.

  3. Click Manual.

  4. In the Cluster drop-down, select the cluster you created in step 1.

  5. Click Create.

  6. In the window that appears, click Run now.

  7. To see the job run results, click the External Link icon next to the Last run timestamp.

For more information on jobs, see Create, run, and manage Databricks Jobs.

Step 7: Query table from Databricks SQL

Anyone with USAGE permissions on the current catalog and database and SELECT permissions on the table can query the contents of the table from their preferred Databricks API.

You can switch to the Databricks SQL UI using the persona switcher above the + in the top left of the screen. Select SQL from the dropdown menu.

You need access to a running SQL warehouse to execute queries in Databricks SQL.

The table you created earlier in this tutorial has the name target_table. You can query it using the catalog you provided in the first cell and the database with the patern e2e_lakehouse_<your_username>. You can use the Data Explorer to find the data objects you created.

Additional Integrations

Learn more about integrations and tools for data engineering with Databricks: