Import Python modules from Databricks repos
This article provides guidance on how you can import Python modules and packages from files stored in a Databricks repo into Delta Live Tables pipelines. You can store Python code in the Databricks repo as modules or packages. You can then import the Python code in your pipeline notebooks. For more information about managing files in a Databricks repo, see Work with Python and R modules.
Note
Source code can only be imported from files in a Databricks repo. You cannot import source code from a workspace file.
Import a Python module to Delta Live Tables from a repo
The following example adapts the example from the Delta Live Tables tutorial by importing dataset queries as a Python module from a repo. To run this example, use the following steps:
To create a repo for your Python code, click
Repos in the sidebar and click Add Repo.
Deselect Create repo by cloning a Git repository and enter a name for the repo in Repository name, for example,
dlt-quickstart-repo
.Create a module to read source data into a table: click the down arrow next to the repo name, select Create > File, and enter a name for the file, for example,
clickstream_raw_module.py
. The file editor opens. Enter the following in the editor window:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Create a module to create a new table containing prepared data: select Create > File again and enter a name for the file, for example,
clickstream_prepared_module.py
. Enter the following in the new editor window:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Create a pipeline notebook: go to your Databricks landing page and select Create a notebook, or click
New in the sidebar and select Notebook. The Create Notebook dialog appears. You can also create the notebook in the repo by clicking the down arrow next to the repo name and selecting Create > Notebook.
In the Create Notebook dialogue, give your notebook a name and select Python from the Default Language dropdown menu. You can leave Cluster set to the default value.
Click Create.
Enter the example code in the notebook.
If you created the notebook in a repo path that’s different from the Python modules path, enter the following code in the first cell of the notebook:
import sys, os sys.path.append(os.path.abspath('<repo-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Replace
<repo-path>
with the path to the Databricks repo containing the Python modules to import.If you created your pipeline notebook in the same repo as the modules you’re importing, you do not need to specify the repo path with
sys.path.append
. Enter the following code in the first cell of the notebook:import sys, os import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Create a pipeline using the new notebook.
To run the pipeline, in the Pipeline details page, click Start.
You can also import Python code as a package. The following code snippet from a Delta Live Tables notebook imports the test_utils
package from the dlt_packages
directory inside the same repo root as the notebook. The dlt_packages
directory contains the files test_utils.py
and __init__.py
, and test_utils.py
defines the function create_test_table()
:
import dlt
@dlt.table
def my_table():
return dlt.read(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)