Managing dependencies in data pipelines

Developing and deploying a data processing pipeline often requires managing complex dependencies between tasks. For example, a pipeline might read data from a source, clean the data, transform the cleaned data, and writing the transformed data to a target. You need to test, schedule, and troubleshoot data pipelines when you operationalize them.

Workflow systems address these challenges by allowing you to define dependencies between tasks, schedule when pipelines run, and monitor workflows. Databricks recommends jobs with multiple tasks to manage your workflows without relying on an external system. Databricks jobs provide task orchestration with standard authentication and access control methods. You can manage jobs using a familiar, user-friendly interface to create and manage complex workflows. You can define a job containing multiple tasks, where each task runs code such as a notebook or JAR, and control the execution order of tasks in a job by specifying dependencies between them. You can configure a job’s tasks to run in sequence or parallel.

Databricks also supports workflow management with Apache Airflow.

Apache Airflow

Apache Airflow is an open source solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations. You define a workflow in a Python file and Airflow manages the scheduling and execution.

Airflow provides tight integration between Databricks and Airflow. The Airflow Databricks integration lets you take advantage of the optimized Spark engine offered by Databricks with the scheduling features of Airflow.

Requirements

  • The integration between Airflow and Databricks is available in Airflow version 1.9.0 and later. The examples in this article are tested with Airflow version 2.1.0.
  • Airflow requires Python 3.6, 3.7, or 3.8. The examples in this article are tested with Python 3.8.

Install the Airflow Databricks integration

To install the Airflow Databricks integration, open a terminal and run the following commands:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow==2.1.0
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email your@email.com

These commands:

  1. Create a directory named airflow and change into that directory.
  2. Use pipenv to create and spawn a Python virtual environment. Databricks recommends using a Python virtual environment to isolate package versions and code dependencies to that environment. This isolation helps reduce unexpected package version mismatches and code dependency collisions.
  3. Initialize an environment variable named AIRFLOW_HOME set to the path of the airflow directory.
  4. Install Airflow and the Airflow Databricks provider packages.
  5. Create an airflow/dags directory. Airflow uses the dags directory to store DAG definitions.
  6. Initialize a SQLite database that Airflow uses to track metadata. In a production Airflow deployment, you would configure Airflow with a standard database. The SQLite database and default configuration for your Airflow deployment are initialized in the airflow directory.
  7. Create an admin user for Airflow.

To install extras, for example, celery, s3, and password, run:

pip install "apache-airflow[databricks, celery, s3, password]"

Start the Airflow web server and scheduler

The Airflow web server is required to view the Airflow UI. To start the web server, open a terminal and run the following command:

airflow webserver

The scheduler is the Airflow component that schedules DAGs. To run it, open a new terminal and run the following command:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Test the Airflow installation

To verify the Airflow installation, you can run one of the example DAGs included with Airflow:

  1. In a browser window, open http://localhost:8080/home. The Airflow DAGs screen appears.
  2. Click the Pause/Unpause DAG toggle to unpause one of the example DAGs, for example, the example_python_operator.
  3. Trigger the example DAG by clicking the Start button.
  4. Click the DAG name to view details, including the run status of the DAG.

Run a Databricks job from Airflow

The Airflow Databricks integration provides two different operators for triggering jobs:

Example

The following example demonstrates how to create a simple Airflow deployment that runs on your local machine and deploys an example DAG to trigger runs in Databricks. For this example, you:

  1. Create a new notebook and add code to print a greeting based on a configured parameter.
  2. Create a Databricks job with a single task that runs the notebook.
  3. Configure an Airflow connection to your Databricks workspace.
  4. Create an Airflow DAG to trigger the notebook job. You define the DAG in a Python script using DatabricksRunNowOperator.
  5. Use the Airflow UI to trigger the DAG and view the run status.

Create a notebook

This example uses a notebook containing two cells:

  • The first cell contains a Databricks Utilities text widget defining a variable named greeting set to the default value world.
  • The second cell prints the value of the greeting variable prefixed by hello.

To create the notebook:

  1. Go to your Databricks landing page and select Create Blank Notebook or click Create Icon Create in the sidebar and select Notebook from the menu. The Create Notebook dialog appears.

  2. In the Create Notebook dialog, give your notebook a name, such as Hello Airflow. Set Default Language to Python. Leave Cluster set to the default value. You will configure the cluster when you create a task that uses this notebook.

  3. Click Create.

  4. Copy the following Python code and paste it into the first cell of the notebook.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  5. Add a new cell below the first cell and copy and paste the following Python code into the new cell:

    print("hello {}".format(greeting))
    

Create a job

  1. Click Jobs Icon Jobs in the sidebar.

  2. Click Create Job Button.

    The Tasks tab displays with the create task dialog.

    Create first task dialog
  3. Replace Add a name for your job… with your job name.

  4. In the Task name field, enter a name for the task, for example, greeting-task.

  5. In the Type drop-down, select Notebook.

  6. Use the file browser to find the notebook you created, click the notebook name, and click Confirm.

  7. Click Add under Parameters. In the Key field, enter greeting. In the Value field, enter Airflow user.

  8. Click Create task.

Run the job

To run the job immediately, click Run Now Button in the upper right corner. You can also run the job by clicking the Runs tab and clicking Run Now in the Active Runs table.

View run details

  1. Click the Runs tab and click View Details in the Active Runs table or the Completed Runs (past 60 days) table.

  2. Copy the Job ID value. This value is required to trigger the job from Airflow.

    View example job results

Create a Databricks personal access token

Airflow connects to Databricks using a Databricks personal access token (PAT). See personal access token for instructions on creating a PAT.

Configure a Databricks connection

Your Airflow installation contains a default connection for Databricks. To update the connection to connect to your workspace using the personal access token you created above:

  1. In a browser window, open http://localhost:8080/connection/list/.

  2. Under Conn ID, locate databricks_default and click the Edit record button.

  3. Replace the value in the Host field with the workspace instance name of your Databricks deployment.

  4. In the Extra field, enter the following value:

    {"token": "PERSONAL_ACCESS_TOKEN"}
    

    Replace PERSONAL_ACCESS_TOKEN with your Databricks personal access token.

Create a new DAG

You define an Airflow DAG in a Python file. To create a DAG to trigger the example notebook job:

  1. In a text editor or IDE, create a new file named databricks_dag.py with the following contents:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Replace JOB_ID with the value of the job ID saved earlier.

  2. Save the file in the airflow/dags directory. Airflow automatically reads and installs DAG files stored in airflow/dags/.

Install and verify the DAG in Airflow

To trigger and verify the DAG in the Airflow UI:

  1. In a browser window, open http://localhost:8080/home. The Airflow DAGs screen appears.
  2. Locate databricks_dag and click the Pause/Unpause DAG toggle to unpause the DAG.
  3. Trigger the DAG by clicking the Start button.
  4. Click a run in the Runs column to view the status and details of the run.