Managing dependencies in data pipelines
Often there are complex dependencies in your data pipelines. Workflow systems allow you to describe such dependencies and schedule when pipelines run.
Apache Airflow
Apache Airflow is a solution for managing and scheduling data pipelines. Airflow represents data pipelines as directed acyclic graphs (DAGs) of operations, where an edge represents a logical dependency between operations.
Airflow provides tight integration between Databricks and Airflow. The Airflow Databricks integration lets you take advantage of the the optimized Spark engine offered by Databricks with the scheduling features of Airflow.
Install the Airflow Databricks integration
The integration between Airflow and Databricks is available in Airflow version 1.9.0. To install the Airflow Databricks integration, run:
pip install "apache-airflow[databricks]"
To install extras (for example celery
, s3
, and password
), run:
pip install "apache-airflow[databricks, celery, s3, password]"
DatabricksRunNowOperator
operator
The Airflow Databricks integration provides DatabricksRunNowOperator as a node in your DAG of computations. This operator matches the Databricks jobs Run now API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS or S3.
DatabricksSubmitRunOperator
operator
The Airflow Databricks integration provides DatabricksSubmitRunOperator as a node in your DAG of computations. This operator matches the Databricks jobs Runs submit API endpoint and allows you to programmatically run notebooks and JARs uploaded to DBFS or S3.
Configure a Databricks connection
To use DatabricksSubmitRunOperator
you must provide credentials in the appropriate Airflow connection. By default, if you do not specify the databricks_conn_id
parameter to DatabricksSubmitRunOperator
, the operator tries to find credentials in the connection with the ID equal to databricks_default
.
You can configure Airflow connections through the Airflow web UI as instructed in Managing Connections. For the Databricks connection, set the Login field to token
and the Extra field to:
{"token": "<personal access token>", "host":"<Databricks hostname>"}
where <personal access token>
is a Databricks-generated personal access token and <Databricks hostname>
is the hostname of your Databricks deployment.
Example
In this example, we show how to set up a simple Airflow deployment that runs on your local machine and deploys an example DAG named that triggers runs in Databricks.
Initialize Airflow database
Initialize the SQLite database that Airflow uses to track miscellaneous metadata. In a production Airflow deployment, you would configure Airflow with a standard database. To perform the initialization run:
airflow initdb
The SQLite database and default configuration for your Airflow deployment are initialized in ~/airflow
.
DAG definition
A DAG definition is a Python file and in this example is named example_databricks_operator.py
. The example runs two Databricks jobs with one linear dependency. The first Databricks job triggers a notebook located at /Users/airflow@example.com/PrepareData
and the second runs a JAR located at dbfs:/lib/etl-0.1.jar
. The example DAG definition constructs two DatabricksSubmitRunOperator
tasks and then sets the dependency at the end with the set_dowstream
method. A skeleton version of the code looks something like:
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
json=spark_jar_task_params)
notebook_task.set_downstream(spark_jar_task)
Import Airflow and required classes
The top of a DAG definition imports airflow
, DAG
, and DatabricksSubmitRunOperator
:
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
Configure global arguments
The next section sets default arguments applied to each task in the DAG.
args = {
'owner': 'airflow',
'email': ['airflow@example.com'],
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0)
}
The two interesting arguments are depends_on_past
and start_date
. Setting depends_on_past
to true
signals that a task should not be triggered unless the previous instance of the task completed successfully. The start_date argument
determines when the first task instance will be scheduled.
Instantiate the DAG
The DAG instantiation statement gives the DAG a unique ID, attaches the default arguments, and gives it a daily schedule.
dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')
The next statement specifies the Spark version, node type, and number of workers in the cluster that will run your tasks. The schema of the specification matches the new_cluster
field of the job Runs Submit endpoint.
new_cluster = {
'spark_version': '6.0.x-scala2.11',
'node_type_id': 'i3.xlarge',
'aws_attributes': {
'availability': 'ON_DEMAND'
},
'num_workers': 8
}
Register tasks in DAG
For notebook_task
, instantiate DatabricksSubmitRunOperator
.
notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/airflow@example.com/PrepareData',
},
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
In this piece of code, the JSON parameter takes a Python dictionary that matches the Runs Submit
endpoint.
For spark_jar_task
, which runs a JAR located at dbfs:/lib/etl-0.1.jar
, instantiate DatabricksSubmitRunOperator
.
# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
new_cluster=new_cluster,
spark_jar_task={
'main_class_name': 'com.example.ProcessData'
},
libraries=[
{
'jar': 'dbfs:/lib/etl-0.1.jar'
}
]
)
To configure spark_jar_task
to run downstream, use the set_downstream
method on notebook_task
to register the dependency.
notebook_task.set_downstream(spark_jar_task)
Notice that in notebook_task
we used the json
parameter to specify the full specification for the submit run endpoint and that in spark_jar_task
we flattened the top level keys of the submit run endpoint into parameters for DatabricksSubmitRunOperator
. Although both ways of instantiating the operator are equivalent, the latter method does not allow you to use any new top level fields such as spark_python_task
or spark_submit_task
. For details, see the DatabricksSubmitRunOperator API.
Install and verify the DAG in Airflow
To install the DAG in Airflow, create the directory ~/airflow/dags
and copy the DAG definition file into that directory.
To verify that Airflow has read in the DAG, run the list_dags
command:
airflow list_dags
[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...
Visualize the DAG in the Airflow UI
You can visualize the DAG in the Airflow web UI. Run airflow webserver
and connect to localhost:8080
. Click example_databricks_operator
to see many visualizations of your DAG. Here is an example:

Configure the connection to Airflow
The connection credentials for Databricks aren’t specified in the DAG definition. By default, DatabricksSubmitRunOperator
sets the databricks_conn_id
parameter to databricks_default
, so add a connection through the web UI described in Configure a Databricks connection for the ID databricks_default
.
Test each task
To test notebook_task
, run airflow test example_databricks_operator notebook_task <YYYY-MM-DD>
and for spark_jar_task
, run airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>
. To run the DAG on a schedule, you would invoke the scheduler daemon process with the command airflow scheduler
.
After starting the scheduler, you should be able to see backfilled runs of your DAG in the web UI.