Run your first ETL workload on Databricks

Learn how to use production-ready tools from Databricks to develop and deploy your first extract, transform, and load (ETL) pipelines for data orchestration.

By the end of this article, you will feel comfortable:

  1. Launching a Databricks all-purpose compute cluster.

  2. Creating a Databricks notebook.

  3. Configuring incremental data ingestion to Delta Lake with Auto Loader.

  4. Executing notebook cells to process, query, and preview data.

  5. Scheduling a notebook as a Databricks job.

This tutorial uses interactive notebooks to complete common ETL tasks in Python or Scala.

You can also use Delta Live Tables to build ETL pipelines. Databricks created Delta Live Tables to reduce the complexity of building, deploying, and maintaining production ETL pipelines. See Tutorial: Run your first Delta Live Tables pipeline.

You can also use the Databricks Terraform provider to create this article’s resources. See Create clusters, notebooks, and jobs with Terraform.

Requirements

Note

If you do not have cluster control privileges, you can still complete most of the steps below as long as you have access to a cluster.

Step 1: Create a cluster

To do exploratory data analysis and data engineering, create a cluster to provide the compute resources needed to execute commands.

  1. Click compute icon Compute in the sidebar.

  2. On the Compute page, click Create Cluster. This opens the New Cluster page.

  3. Specify a unique name for the cluster, leave the remaining values in their default state, and click Create Cluster.

To learn more about Databricks clusters, see Compute.

Step 2: Create a Databricks notebook

To create a notebook in your workspace, click New Icon New in the sidebar, and then click Notebook. A blank notebook opens in the workspace.

To learn more about creating and managing notebooks, see Manage notebooks.

Step 3: Configure Auto Loader to ingest data to Delta Lake

Databricks recommends using Auto Loader for incremental data ingestion. Auto Loader automatically detects and processes new files as they arrive in cloud object storage.

Databricks recommends storing data with Delta Lake. Delta Lake is an open source storage layer that provides ACID transactions and enables the data lakehouse. Delta Lake is the default format for tables created in Databricks.

To configure Auto Loader to ingest data to a Delta Lake table, copy and paste the following code into the empty cell in your notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Note

The variables defined in this code should allow you to safely execute it without risk of conflicting with existing workspace assets or other users. Restricted network or storage permissions will raise errors when executing this code; contact your workspace administrator to troubleshoot these restrictions.

To learn more about Auto Loader, see What is Auto Loader?.

Step 4: Process and interact with data

Notebooks execute logic cell-by-cell. To execute the logic in your cell:

  1. To run the cell you completed in the previous step, select the cell and press SHIFT+ENTER.

  2. To query the table you’ve just created, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    df = spark.read.table(table_name)
    
    val df = spark.read.table(table_name)
    
  3. To preview the data in your DataFrame, copy and paste the following code into an empty cell, then press SHIFT+ENTER to run the cell.

    display(df)
    
    display(df)
    

To learn more about interactive options for visualizing data, see Visualizations in Databricks notebooks.

Step 5: Schedule a job

You can run Databricks notebooks as production scripts by adding them as a task in a Databricks job. In this step, you will create a new job that you can trigger manually.

To schedule your notebook as a task:

  1. Click Schedule on the right side of the header bar.

  2. Enter a unique name for the Job name.

  3. Click Manual.

  4. In the Cluster drop-down, select the cluster you created in step 1.

  5. Click Create.

  6. In the window that appears, click Run now.

  7. To see the job run results, click the External Link icon next to the Last run timestamp.

For more information on jobs, see What are Databricks jobs?.

Additional Integrations

Learn more about integrations and tools for data engineering with Databricks: