CI/CD with Jenkins on Databricks
Note
This article covers Jenkins, which is neither provided nor supported by Databricks. To contact the provider, see Jenkins Help.
Continuous integration and continuous delivery (CI/CD) refers to the process of developing and delivering software in short, frequent cycles through the use of automation pipelines. While this is by no means a new process, having been ubiquitous in traditional software engineering for decades, it is becoming an increasingly necessary process for data engineering and data science teams. In order for data products to be valuable, they must be delivered in a timely manner. Additionally, consumers must have confidence in the validity of outcomes within these products. By automating the building, testing, and deployment of code, development teams are able to deliver releases more frequently and reliably than the more manual processes that are still prevalent across many data engineering and data science teams.
Continuous integration begins with the practice of having you commit your code with some frequency to a branch within a source code repository. Each commit is then merged with the commits from other developers to ensure that no conflicts were introduced. Changes are further validated by creating a build and running automated tests against that build. This process ultimately results in an artifact, or deployment bundle, that will eventually be deployed to a target environment, in this case a Databricks workspace.
Overview of a typical Databricks CI/CD pipeline
Though it can vary based on your needs, a typical configuration for a Databricks pipeline includes the following steps:
Continuous integration:
Code
Develop code and unit tests in a Databricks notebook or using an external IDE.
Manually run tests.
Commit code and tests to a git branch.
Build
Gather new and updated code and tests.
Run automated tests.
Build libraries and non-notebook Apache Spark code.
Release: Generate a release artifact.
Continuous delivery:
Deploy
Deploy notebooks.
Deploy libraries.
Test: Run automated tests and report results.
Operate: Programmatically schedule data engineering, analytics, and machine learning workflows.
Develop and commit your code
One of the first steps in designing a CI/CD pipeline is deciding on a code commit and branching strategy to manage the development and integration of new and updated code without adversely affecting the code currently in production. Part of this decision involves choosing a version control system to contain your code and facilitate the promotion of that code. Databricks supports integrations with GitHub and Bitbucket, which allow you to commit notebooks to a git repository.
If your version control system is not among those supported through direct notebook integration, or if you want more flexibility and control than the self-service git integration, you can use the Databricks CLI to export notebooks and commit them from your local machine. This script should be run from within a local git repository that is set up to sync with the appropriate remote repository. When executed, this script should:
Check out the desired branch.
Pull new changes from the remote branch.
Export notebooks from the Databricks workspace using the Databricks CLI.
Prompt the user for a commit message or use the default if one is not provided.
Commit the updated notebooks to the local branch.
Push the changes to the remote branch.
The following script performs these steps:
git checkout <branch>
git pull
databricks workspace export_dir --profile <profile> -o <path> ./Workspace
dt=`date '+%Y-%m-%d %H:%M:%S'`
msg_default="DB export on $dt"
read -p "Enter the commit comment [$msg_default]: " msg
msg=${msg:-$msg_default}
echo $msg
git add .
git commit -m "<commit-message>"
git push
If you prefer to develop in an IDE rather than in Databricks notebooks, you can use the VCS integration features built into modern IDEs or the git CLI to commit your code.
Databricks provides Databricks Connect, an SDK that connects IDEs to Databricks clusters. This is especially useful when developing libraries, as it allows you to run and unit test your code on Databricks clusters without having to deploy that code. See Databricks Connect limitations to determine whether your use case is supported.
Note
Databricks now recommends that you use dbx by Databricks Labs for local development instead of Databricks Connect.
Depending on your branching strategy and promotion process, the point at which a CI/CD pipeline will initiate a build will vary. However, committed code from various contributors will eventually be merged into a designated branch to be built and deployed. Branch management steps run outside of Databricks, using the interfaces provided by the version control system.
There are numerous CI/CD tools you can use to manage and execute your pipeline. This article illustrates how to use the Jenkins automation server. CI/CD is a design pattern, so the steps and stages outlined in this article should transfer with a few changes to the pipeline definition language in each tool. Furthermore, much of the code in this example pipeline runs standard Python code, which you can invoke in other tools.
Configure your agent
Jenkins uses a master service for coordination and one to many execution agents. In this example you use the default permanent agent node included with the Jenkins server. You must manually install the following tools and packages required by the pipeline on the agent, in this case the Jenkins server:
Conda: an open source Python environment management system.
Python 3.7: used to run tests, build a deployment wheel, and execute deployment scripts. The version of Python is important, because tests require that the version of Python running on the agent should match that of the Databricks cluster. This example uses Databricks Runtime 7.3, which includes Python 3.7.
Python libraries:
requests
,databricks-connect
,databricks-cli
, andpytest
.
Design the pipeline
Jenkins provides a few different project types to create CI/CD pipelines. This example implements a Jenkins Pipeline. Jenkins Pipelines provide an interface to define stages in a Pipeline using Groovy code to call and configure Jenkins plugins.

You write a Pipeline definition in a text file (called a Jenkinsfile) which in turn is checked into a project’s source control repository. For more information, see Jenkins Pipeline. Here is an example Pipeline:
// Jenkinsfile
node {
def GITREPO = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE = "https://github.com/<repo>"
def GITHUBCREDID = "<github-token>"
def CURRENTRELEASE = "<release>"
def DBTOKEN = "<databricks-token>"
def DBURL = "https://<databricks-instance>"
def SCRIPTPATH = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH = "${GITREPO}/Workspace"
def LIBRARYPATH = "${GITREPO}/Libraries"
def BUILDPATH = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH = "/Shared/<path>"
def DBFSPATH = "dbfs:<dbfs-path>"
def CLUSTERID = "<cluster-id>"
def CONDAPATH = "<conda-path>"
def CONDAENV = "<conda-env>"
stage('Setup') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
# Configure Conda environment for deployment & testing
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Configure Databricks CLI for deployment
echo "${DBURL}
$TOKEN" | databricks configure --token
# Configure Databricks Connect for testing
echo "${DBURL}
$TOKEN
${CLUSTERID}
0
15001" | databricks-connect configure
"""
}
}
stage('Checkout') { // for display purposes
echo "Pulling ${CURRENTRELEASE} Branch from Github"
git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}
stage('Run Unit Tests') {
try {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Python tests for libs
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
"""
} catch(err) {
step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
if (currentBuild.result == 'UNSTABLE')
currentBuild.result = 'FAILURE'
throw err
}
}
stage('Package') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Package Python library to wheel
cd ${LIBRARYPATH}/python/dbxdemo
python3 setup.py sdist bdist_wheel
"""
}
stage('Build Artifact') {
sh """mkdir -p ${BUILDPATH}/Workspace
mkdir -p ${BUILDPATH}/Libraries/python
mkdir -p ${BUILDPATH}/Validation/Output
#Get modified files
git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}
# Get packaged libs
find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/
# Generate artifact
tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
"""
archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}
stage('Deploy') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Use Databricks CLI to deploy notebooks
databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}
dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
"""
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
#Get space delimited list of libraries
LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")
#Script to uninstall, reboot if needed & instsall library
python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--libs=\$LIBS\
--dbfspath=${DBFSPATH}
"""
}
}
stage('Run Integration Tests') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--localpath=${NOTEBOOKPATH}/VALIDATION\
--workspacepath=${WORKSPACEPATH}/VALIDATION\
--outfilepath=${OUTFILEPATH}
"""
}
sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
"""
}
stage('Report Test Results') {
sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
touch ${TESTRESULTPATH}/TEST-*.xml
"""
junit "**/reports/junit/*.xml"
}
}
The remainder of this article discusses each step in the Pipeline.
Define environment variables
You can define environment variables to allow the Pipeline stages to be used in different Pipelines.
Note
As a security best practice, when authenticating with automated tools, systems, scripts, and apps, Databricks recommends you use access tokens belonging to service principals instead of workspace users. To create access tokens for service principals, see Manage access tokens for a service principal.
GITREPO
: local path to the git repository rootGITREPOREMOTE
: Web URL of git repositoryGITHUBCREDID
: Jenkins credential ID for the GitHub personal access tokenCURRENTRELEASE
: deployment branchDBTOKEN
: Jenkins credential ID for the Databricks personal access tokenDBURL
: Web URL for Databricks workspaceSCRIPTPATH
: local path to the git project directory for automation scriptsNOTEBOOKPATH
: local path to the git project directory for notebooksLIBRARYPATH
: local path to the git project directory for library code or other DBFS codeBUILDPATH
: local path to the directory for build artifactsOUTFILEPATH
: local path to the JSON result files generated from automated testsTESTRESULTPATH
: local path to the directory for Junit test result summariesWORKSPACEPATH
: Databricks workspace path for notebooksDBFSPATH
: Databricks DBFS path for libraries and non-notebook codeCLUSTERID
: Databricks cluster ID to run testsCONDAPATH
: path to Conda installationCONDAENV
: name for Conda environment containing build dependency libraries``
Set up the pipeline
In the Setup
stage you configure Databricks CLI and Databricks Connect with connection information.
def GITREPO = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE = "https://github.com/<repo>"
def GITHUBCREDID = "<github-token>"
def CURRENTRELEASE = "<release>"
def DBTOKEN = "<databricks-token>"
def DBURL = "https://<databricks-instance>"
def SCRIPTPATH = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH = "${GITREPO}/Workspace"
def LIBRARYPATH = "${GITREPO}/Libraries"
def BUILDPATH = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH = "/Shared/<path>"
def DBFSPATH = "dbfs:<dbfs-path>"
def CLUSTERID = "<cluster-id>"
def CONDAPATH = "<conda-path>"
def CONDAENV = "<conda-env>"
stage('Setup') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
# Configure Conda environment for deployment & testing
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Configure Databricks CLI for deployment
echo "${DBURL}
$TOKEN" | databricks configure --token
# Configure Databricks Connect for testing
echo "${DBURL}
$TOKEN
${CLUSTERID}
0
15001" | databricks-connect configure
"""
}
}
Get the latest changes
The Checkout
stage downloads code from the designated branch to the agent execution agent using a Jenkins plugin:
stage('Checkout') { // for display purposes
echo "Pulling ${CURRENTRELEASE} Branch from Github"
git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}
Develop unit tests
There are a few different options when deciding how to unit test your code. For library code developed outside a Databricks notebook, the process is like traditional software development practices. You write a unit test using a testing framework, like the Python pytest module, and JUnit-formatted XML files store the test results.
The Databricks process differs in that the code being tested is Apache Spark code intended to be executed on a Spark cluster
often running locally or in this case on Databricks. To accommodate this requirement, you use
Databricks Connect. Since the SDK was configured earlier, no changes to the test code are
required to execute the tests on Databricks clusters. You installed Databricks Connect in a
Conda virtual environment. Once the Conda environment is activated, the tests are executed using
the Python tool, pytest
, to which you provide the locations for the tests and the resulting
output files.
Test library code using Databricks Connect
stage('Run Unit Tests') {
try {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Python tests for libs
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
"""
} catch(err) {
step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
if (currentBuild.result == 'UNSTABLE')
currentBuild.result = 'FAILURE'
throw err
}
}
The following snippet is a library function that might be installed on a Databricks cluster. It is a simple function that adds a new column, populated by a literal, to an Apache Spark DataFrame.
# addcol.py
import pyspark.sql.functions as F
def with_status(df):
return df.withColumn("status", F.lit("checked"))
This test passes a mock DataFrame object to the with_status
function, defined in addcol.py
. The
result is then compared to a DataFrame object containing the expected values. If the values match,
which in this case they will, the test passes.
# test-addcol.py
import pytest
from dbxdemo.spark import get_spark
from dbxdemo.appendcol import with_status
class TestAppendCol(object):
def test_with_status(self):
source_data = [
("paula", "white", "paula.white@example.com"),
("john", "baer", "john.baer@example.com")
]
source_df = get_spark().createDataFrame(
source_data,
["first_name", "last_name", "email"]
)
actual_df = with_status(source_df)
expected_data = [
("paula", "white", "paula.white@example.com", "checked"),
("john", "baer", "john.baer@example.com", "checked")
]
expected_df = get_spark().createDataFrame(
expected_data,
["first_name", "last_name", "email", "status"]
)
assert(expected_df.collect() == actual_df.collect())
Package library code
In the Package
stage you package the library code into a Python wheel.
stage('Package') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Package Python library to wheel
cd ${LIBRARYPATH}/python/dbxdemo
python3 setup.py sdist bdist_wheel
"""
}
Generate and store a deployment artifact
Building a deployment artifact for Databricks involves gathering all the new or updated code to be
deployed to the appropriate Databricks environment. In the Build Artifact
stage you add the notebook code to be
deployed to the workspace, any whl
libraries that were generated by the build process, as well as
the result summaries for the tests, for archiving purposes. To do this you use git diff
to flag all new files
that have been included in the most recent git merge. This is only an example method, so the
implementation in your Pipeline may differ, but the objective is to add all files intended for the
current release.
stage('Build Artifact') {
sh """mkdir -p ${BUILDPATH}/Workspace
mkdir -p ${BUILDPATH}/Libraries/python
mkdir -p ${BUILDPATH}/Validation/Output
#Get Modified Files
git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}
# Get packaged libs
find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/
# Generate artifact
tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
"""
archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}
Deploy artifacts
In the Deploy
stage you use the Databricks CLI, which, like the Databricks Connect module used
earlier, is installed in your Conda environment, so you must activate it for this shell session.
You use the Workspace CLI and DBFS CLI to upload the notebooks and libraries, respectively:
databricks workspace import_dir <local build path> <remote workspace path>
dbfs cp -r <local build path> <remote dbfs path>
stage('Deploy') {
sh """#!/bin/bash
# Enable Conda environment for tests
source ${CONDAPATH}/bin/activate ${CONDAENV}
# Use Databricks CLI to deploy notebooks
databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}
dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
"""
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """#!/bin/bash
#Get space delimited list of libraries
LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")
#Script to uninstall, reboot if needed & instsall library
python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--libs=\$LIBS\
--dbfspath=${DBFSPATH}
"""
}
}
Installing a new version of a library on a Databricks cluster requires that you first uninstall the existing library. To do this, you invoke the Databricks REST API in a Python script to perform the following steps:
Check if the library is installed.
Uninstall the library.
Restart the cluster if any uninstalls were performed.
Wait until the cluster is running again before proceeding.
Install the library.
# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time
def main():
workspace = ''
token = ''
clusterid = ''
libs = ''
dbfspath = ''
try:
opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
['workspace=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
except getopt.GetoptError:
print(
'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(
'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
sys.exit()
elif opt in ('-s', '--workspace'):
workspace = arg
elif opt in ('-t', '--token'):
token = arg
elif opt in ('-c', '--clusterid'):
clusterid = arg
elif opt in ('-l', '--libs'):
libs=arg
elif opt in ('-d', '--dbfspath'):
dbfspath=arg
print('-s is ' + workspace)
print('-t is ' + token)
print('-c is ' + clusterid)
print('-l is ' + libs)
print('-d is ' + dbfspath)
libslist = libs.split()
# Uninstall Library if exists on cluster
i=0
for lib in libslist:
dbfslib = dbfspath + lib
print(dbfslib + ' before:' + getLibStatus(workspace, token, clusterid, dbfslib))
if (getLibStatus(workspace, token, clusterid, dbfslib) != 'not found'):
print(dbfslib + " exists. Uninstalling.")
i = i + 1
values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
resp = requests.post(workspace + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))
runjson = resp.text
d = json.loads(runjson)
print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))
# Restart if libraries uninstalled
if i > 0:
values = {'cluster_id': clusterid}
print("Restarting cluster:" + clusterid)
resp = requests.post(workspace + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))
restartjson = resp.text
print(restartjson)
p = 0
waiting = True
while waiting:
time.sleep(30)
clusterresp = requests.get(workspace + '/api/2.0/clusters/get?cluster_id=' + clusterid,
auth=("token", token))
clusterjson = clusterresp.text
jsonout = json.loads(clusterjson)
current_state = jsonout['state']
print(clusterid + " state:" + current_state)
if current_state in ['RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
break
p = p + 1
# Install Libraries
for lib in libslist:
dbfslib = dbfspath + lib
print("Installing " + dbfslib)
values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}
resp = requests.post(workspace + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))
runjson = resp.text
d = json.loads(runjson)
print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))
def getLibStatus(workspace, token, clusterid, dbfslib):
resp = requests.get(workspace + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
libjson = resp.text
d = json.loads(libjson)
if (d.get('library_statuses')):
statuses = d['library_statuses']
for status in statuses:
if (status['library'].get('whl')):
if (status['library']['whl'] == dbfslib):
return status['status']
else:
return "not found"
else:
# No libraries found
return "not found"
if __name__ == '__main__':
main()
Test notebook code using another notebook
Once the artifact has been deployed, it is important to run integration tests to ensure all the code
is working together in the new environment. To do this you can run a notebook containing asserts to
test the deployment. In this case you are using the same test you used in the unit test, but now it
is importing the installed appendcol
library from the whl
that was just installed on the cluster.
To automate this test and include it in your CI/CD Pipeline, use the Databricks REST API to
run the notebook from the Jenkins server. This allows you to check whether the notebook
run passed or failed using pytest
. If the asserts in the notebook fail, this will be
shown in the JSON output returned by the REST API and subsequently in the JUnit test results.
stage('Run Integration Tests') {
withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
--token=$TOKEN\
--clusterid=${CLUSTERID}\
--localpath=${NOTEBOOKPATH}/VALIDATION\
--workspacepath=${WORKSPACEPATH}/VALIDATION\
--outfilepath=${OUTFILEPATH}
"""
}
sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
"""
}
This stage calls two Python automation scripts. The first script, executenotebook.py
, runs the
notebook using the Create and trigger a one-time run (POST /jobs/runs/submit
) endpoint which submits an anonymous job. Since this endpoint is asynchronous, it uses the job ID initially returned by the REST call to poll for the status of the
job. Once the job has completed, the JSON output is saved to the path specified by the function
arguments passed at invocation.
# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time
def main():
workspace = ''
token = ''
clusterid = ''
localpath = ''
workspacepath = ''
outfilepath = ''
try:
opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
['workspace=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
except getopt.GetoptError:
print(
'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(
'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
sys.exit()
elif opt in ('-s', '--workspace'):
workspace = arg
elif opt in ('-t', '--token'):
token = arg
elif opt in ('-c', '--clusterid'):
clusterid = arg
elif opt in ('-l', '--localpath'):
localpath = arg
elif opt in ('-w', '--workspacepath'):
workspacepath = arg
elif opt in ('-o', '--outfilepath'):
outfilepath = arg
print('-s is ' + workspace)
print('-t is ' + token)
print('-c is ' + clusterid)
print('-l is ' + localpath)
print('-w is ' + workspacepath)
print('-o is ' + outfilepath)
# Generate array from walking local path
notebooks = []
for path, subdirs, files in os.walk(localpath):
for name in files:
fullpath = path + '/' + name
# removes localpath to repo but keeps workspace path
fullworkspacepath = workspacepath + path.replace(localpath, '')
name, file_extension = os.path.splitext(fullpath)
if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
row = [fullpath, fullworkspacepath, 1]
notebooks.append(row)
# run each element in list
for notebook in notebooks:
nameonly = os.path.basename(notebook[0])
workspacepath = notebook[1]
name, file_extension = os.path.splitext(nameonly)
# workpath removes extension
fullworkspacepath = workspacepath + '/' + name
print('Running job for:' + fullworkspacepath)
values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}
resp = requests.post(workspace + '/api/2.0/jobs/runs/submit',
data=json.dumps(values), auth=("token", token))
runjson = resp.text
print("runjson:" + runjson)
d = json.loads(runjson)
runid = d['run_id']
i=0
waiting = True
while waiting:
time.sleep(10)
jobresp = requests.get(workspace + '/api/2.0/jobs/runs/get?run_id='+str(runid),
data=json.dumps(values), auth=("token", token))
jobjson = jobresp.text
print("jobjson:" + jobjson)
j = json.loads(jobjson)
current_state = j['state']['life_cycle_state']
runid = j['run_id']
if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
break
i=i+1
if outfilepath != '':
file = open(outfilepath + '/' + str(runid) + '.json', 'w')
file.write(json.dumps(j))
file.close()
if __name__ == '__main__':
main()
The second script, evaluatenotebookruns.py
, defines the test_job_run
function, which parses and
evaluates the JSON to determine if the assert statements within the notebook passed or failed.
An additional test, test_performance
, catches tests that run longer than expected.
# evaluatenotebookruns.py
import unittest
import json
import glob
import os
class TestJobOutput(unittest.TestCase):
test_output_path = '#ENV#'
def test_performance(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
data = json.load(open(filename))
duration = data['execution_duration']
if duration > 100000:
status = 'FAILED'
else:
status = 'SUCCESS'
statuses.append(status)
self.assertFalse('FAILED' in statuses)
def test_job_run(self):
path = self.test_output_path
statuses = []
for filename in glob.glob(os.path.join(path, '*.json')):
print('Evaluating: ' + filename)
data = json.load(open(filename))
status = data['state']['result_state']
statuses.append(status)
self.assertFalse('FAILED' in statuses)
if __name__ == '__main__':
unittest.main()
As seen earlier in the unit test stage, you use pytest
to run the tests and generate the result summaries.
Publish test results
The JSON results are archived and the test results are published to Jenkins using the junit
Jenkins plugin. This enables you to visualize reports and dashboards related to the status of
the build process.
stage('Report Test Results') {
sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
touch ${TESTRESULTPATH}/TEST-*.xml
"""
junit "**/reports/junit/*.xml"
}

At this point, the CI/CD pipeline has completed an integration and deployment cycle. By automating this process, you can ensure that your code has been tested and deployed by an efficient, consistent, and repeatable process.