Orchestrate Databricks jobs with Apache Airflow
This article shows an example of orchestrating Databricks jobs in a data pipeline with Apache Airflow. You’ll also learn how to set up the AirFlow integration with Databricks. Job orchestration manages complex dependencies between tasks.
Job orchestration in a data pipeline
Developing and deploying a data processing pipeline often requires managing complex dependencies between tasks. For example, a pipeline might read data from a source, clean the data, transform the cleaned data, and writing the transformed data to a target. You need to test, schedule, and troubleshoot data pipelines when you operationalize them.
Workflow systems address these challenges by allowing you to define dependencies between tasks, schedule when pipelines run, and monitor workflows. Apache Airflow is an open source solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations. You define a workflow in a Python file and Airflow manages the scheduling and execution. The Airflow Databricks connection lets you take advantage of the optimized Spark engine offered by Databricks with the scheduling features of Airflow.
Requirements
The integration between Airflow and Databricks is available in Airflow version 1.9.0 and later. The examples in this article are tested with Airflow version 2.1.0.
Airflow requires Python 3.6, 3.7, or 3.8. The examples in this article are tested with Python 3.8.
Install the Airflow Databricks integration
To install the Airflow Databricks integration, open a terminal and run the following commands. Be sure to substitute your user name and email in the last line:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow==2.1.0
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email your@email.com
When you copy and run the script above, you perform these steps:
Create a directory named
airflow
and change into that directory.Use
pipenv
to create and spawn a Python virtual environment. Databricks recommends using a Python virtual environment to isolate package versions and code dependencies to that environment. This isolation helps reduce unexpected package version mismatches and code dependency collisions.Initialize an environment variable named
AIRFLOW_HOME
set to the path of theairflow
directory.Install Airflow and the Airflow Databricks provider packages.
Create an
airflow/dags
directory. Airflow uses thedags
directory to store DAG definitions.Initialize a SQLite database that Airflow uses to track metadata. In a production Airflow deployment, you would configure Airflow with a standard database. The SQLite database and default configuration for your Airflow deployment are initialized in the
airflow
directory.Create an admin user for Airflow.
To install extras, for example, celery
, s3
, and password
, run:
pip install "apache-airflow[databricks, celery, s3, password]"
Start the Airflow web server and scheduler
The Airflow web server is required to view the Airflow UI. To start the web server, open a terminal and run the following command:
airflow webserver
The scheduler is the Airflow component that schedules DAGs. To run it, open a new terminal and run the following command:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Test the Airflow installation
To verify the Airflow installation, you can run one of the example DAGs included with Airflow:
In a browser window, open http://localhost:8080/home. The Airflow DAGs screen appears.
Click the Pause/Unpause DAG toggle to unpause one of the example DAGs, for example, the
example_python_operator
.Trigger the example DAG by clicking the Start button.
Click the DAG name to view details, including the run status of the DAG.
Airflow operators for Databricks
The Airflow Databricks integration provides two different operators for triggering jobs:
The DatabricksRunNowOperator requires an existing Databricks job and uses the Trigger a new job run (
POST /jobs/run-now
) API request to trigger a run. Databricks recommends usingDatabricksRunNowOperator
because it reduces duplication of job definitions and job runs triggered with this operator are easy to find in the jobs UI.The DatabricksSubmitRunOperator does not require a job to exist in Databricks and uses the Create and trigger a one-time run (
POST /jobs/runs/submit
) API request to submit the job specification and trigger a run.
The Databricks Airflow operator writes the job run page URL to the Airflow logs every polling_period_seconds
(the default is 30 seconds). For more information, see the apache-airflow-providers-databricks package page on the Airflow website.
Run a Databricks job with Airflow
The following example demonstrates how to create a simple Airflow deployment that runs on your local machine and deploys an example DAG to trigger runs in Databricks. For this example, you:
Create a new notebook and add code to print a greeting based on a configured parameter.
Create a Databricks job with a single task that runs the notebook.
Configure an Airflow connection to your Databricks workspace.
Create an Airflow DAG to trigger the notebook job. You define the DAG in a Python script using
DatabricksRunNowOperator
.Use the Airflow UI to trigger the DAG and view the run status.
Create a notebook
This example uses a notebook containing two cells:
The first cell contains a Databricks Utilities text widget defining a variable named
greeting
set to the default valueworld
.The second cell prints the value of the
greeting
variable prefixed byhello
.
To create the notebook:
Go to your Databricks landing page and select Create Blank Notebook, or click
New in the sidebar and select Notebook. The Create Notebook dialog appears.
In the Create Notebook dialog, give your notebook a name, such as Hello Airflow. Set Default Language to Python. Leave Cluster set to the default value. You will configure the cluster when you create a task that uses this notebook.
Click Create.
Copy the following Python code and paste it into the first cell of the notebook.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Add a new cell below the first cell and copy and paste the following Python code into the new cell:
print("hello {}".format(greeting))
Create a job
Click
Workflows in the sidebar.
Click
.
The Tasks tab displays with the create task dialog.
Replace Add a name for your job… with your job name.
In the Task name field, enter a name for the task, for example, greeting-task.
In the Type dropdown menu, select Notebook.
Use the file browser to find the notebook you created, click the notebook name, and click Confirm.
Click Add under Parameters. In the Key field, enter
greeting
. In the Value field, enterAirflow user
.Click Create task.
Run the job
To run the job immediately, click in the upper right corner. You can also run the job by clicking the Runs tab and clicking Run Now in the Active Runs table.
View run details
Click the Runs tab and click View Details in the Active Runs table or the Completed Runs (past 60 days) table.
Copy the Job ID value. This value is required to trigger the job from Airflow.
Create a Databricks personal access token for Airflow
Note
As a security best practice when you authenticate with automated tools, systems, scripts, and apps, Databricks recommends that you use OAuth tokens or personal access tokens belonging to service principals instead of workspace users. To create tokens for service principals, see Manage tokens for a service principal.
Airflow connects to Databricks using a Databricks personal access token (PAT). See personal access token for instructions on creating a PAT.
Configure a Databricks connection
Your Airflow installation contains a default connection for Databricks. To update the connection to connect to your workspace using the personal access token you created above:
In a browser window, open http://localhost:8080/connection/list/.
Under Conn ID, locate databricks_default and click the Edit record button.
Replace the value in the Host field with the workspace instance name of your Databricks deployment.
In the Extra field, enter the following value:
{"token": "PERSONAL_ACCESS_TOKEN"}
Replace
PERSONAL_ACCESS_TOKEN
with your Databricks personal access token.
Create a new Airflow DAG
You define an Airflow DAG in a Python file. To create a DAG to trigger the example notebook job:
In a text editor or IDE, create a new file named
databricks_dag.py
with the following contents:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Replace
JOB_ID
with the value of the job ID saved earlier.Save the file in the
airflow/dags
directory. Airflow automatically reads and installs DAG files stored inairflow/dags/
.
Install and verify the DAG in Airflow
To trigger and verify the DAG in the Airflow UI:
In a browser window, open http://localhost:8080/home. The Airflow DAGs screen appears.
Locate
databricks_dag
and click the Pause/Unpause DAG toggle to unpause the DAG.Trigger the DAG by clicking the Start button.
Click a run in the Runs column to view the status and details of the run.