Continuous Integration and Delivery on Databricks Using Jenkins

Continuous integration and continuous delivery (CI/CD) refers to the process of developing and delivering software in short, frequent cycles through the use of automation pipelines. While this is by no means a new process, having been ubiquitous in traditional software engineering for decades, it is becoming an increasingly necessary process for data engineering and data science teams. In order for data products to be valuable, they must be delivered in a timely manner. Additionally, consumers must have confidence in the validity of outcomes within these products. By automating the building, testing, and deployment of code, development teams are able to deliver releases more frequently and reliably than the more manual processes that are still prevalent across many data engineering and data science teams.

Continuous integration begins with the practice of having you commit your code with some frequency to a branch within a source code repository. Each commit is then merged with the commits from other developers to ensure that no conflicts were introduced. Changes are further validated by creating a build and running automated tests against that build. This process ultimately results in an artifact, or deployment bundle, that will eventually be deployed to a target environment, in this case a Databricks workspace.

Overview of a typical Databricks CI/CD pipeline

Though it can vary based on your needs, a typical configuration for a Databricks pipeline includes the following steps:

Continuous integration:

  1. Code
    1. Develop code and unit tests in a Databricks notebook or using an external IDE.
    2. Manually run tests.
    3. Commit code and tests to a git branch.
  2. Build
    1. Gather new and updated code and tests.
    2. Run automated tests.
    3. Build libraries and non-notebook Apache Spark code.
  3. Release: Generate a release artifact.

Continuous delivery:

  1. Deploy
    1. Deploy notebooks.
    2. Deploy libraries.
  2. Test: Run automated tests and report results.
  3. Operate: Programatically schedule data engineering, analytics, and machine learning workflows.

Develop and commit your code

One of the first steps in designing a CI/CD pipeline is deciding on a code commit and branching strategy to manage the development and integration of new and updated code without adversely affecting the code currently in production. Part of this decision involves choosing a version control system (VCS) to contain your code and facilitate the promotion of that code. Databricks supports some VCS integrations which allow you to commit notebooks to a git repository.

If your version control system is not among those supported through direct notebook integration, or if you want more flexibility and control than the self-service git integration, you can use the Databricks CLI to export notebooks and commit them from your local machine. This script should be run from within a local git repository that is set up to sync with the appropriate remote repository. When executed, this script should:

  1. Check out the desired branch.
  2. Pull new changes from the remote branch.
  3. Export notebooks from the Databricks workspace using the Databricks Workspace CLI.
  4. Prompt the user for a commit message or use the default if one is not provided.
  5. Commit the updated notebooks to the local branch.
  6. Push the changes to the remote branch.

The following script performs these steps:

git checkout <branch>
git pull
databricks workspace export_dir --profile <profile> -o <path> ./Workspace

dt=`date '+%Y-%m-%d %H:%M:%S'`
msg_default="DB export on $dt"
read -p "Enter the commit comment [$msg_default]: " msg
msg=${msg:-$msg_default}
echo $msg

git add .
git commit -m "<commit-message>"
git push

If you prefer to develop in an IDE rather than in Databricks notebooks, you can use the VCS integration features built into modern IDEs or the git CLI to commit your code.

Databricks provides Databricks Connect, an SDK that connects IDEs to Databricks clusters. This is especially useful when developing libraries, as it allows you to run and unit test your code on Databricks clusters without having to deploy that code. Refer to the Databricks Connect limitations to ensure your use case is supported.

Depending on your branching strategy and promotion process, the point at which a CI/CD pipeline will initiate a build will vary. However, committed code from various contributors will eventually be merged into a designated branch to be built and deployed. Branch management steps run outside of Databricks, using the interfaces provided by the version control system.

There are numerous CI/CD tools you can use to manage and execute your pipeline. This article illustrates how to use the Jenkins automation server. CI/CD is a design pattern, so the steps and stages outlined in this article should transfer with a few changes to the pipeline definition language in each tool. Furthermore, much of the code in this example pipeline runs standard Python code, which you can invoke in other tools.

Configure your agent

Jenkins uses a master service for coordination and one to many execution agents. In this example you use the default permanent agent node included with the Jenkins server. You must manually install the following tools and packages required by the pipeline on the agent, in this case the Jenkins server:

  • Conda: an open source Python environment management system.
  • Python 3.7.3: used to run tests, build a deployment wheel, and execute deployment scripts. The version of Python is important as tests require that the version of Python running on the agent should match that of the Databricks cluster. This example uses Databricks Runtime 6.1, which contains Python 3.7.
  • Python libraries: requests, databricks-connect, databricks-cli, and pytest.

Design the pipeline

Jenkins provides a few different project types to create CI/CD pipelines. This example implements a Jenkins Pipeline. Jenkins Pipelines provide an interface to define stages in a Pipeline using Groovy code to call and configure Jenkins plugins.

Jenkins project types

You write a Pipeline definition in a text file (called a Jenkinsfile) which in turn is checked into a project’s source control repository. For more information, see Jenkins Pipeline. Here is an example Pipeline:

// Jenkinsfile
node {
  def GITREPO         = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
  def GITREPOREMOTE   = "https://github.com/<repo>"
  def GITHUBCREDID    = "<github-token>"
  def CURRENTRELEASE  = "<release>"
  def DBTOKEN         = "<databricks-token>"
  def DBURL           = "https://<databricks-instance>"
  def SCRIPTPATH      = "${GITREPO}/Automation/Deployments"
  def NOTEBOOKPATH    = "${GITREPO}/Workspace"
  def LIBRARYPATH     = "${GITREPO}/Libraries"
  def BUILDPATH       = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
  def OUTFILEPATH     = "${BUILDPATH}/Validation/Output"
  def TESTRESULTPATH  = "${BUILDPATH}/Validation/reports/junit"
  def WORKSPACEPATH   = "/Shared/<path>"
  def DBFSPATH        = "dbfs:<dbfs-path>"
  def CLUSTERID       = "<cluster-id>"
  def CONDAPATH       = "<conda-path>"
  def CONDAENV        = "<conda-env>"

  stage('Setup') {
      withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """#!/bin/bash
            # Configure Conda environment for deployment & testing
            source ${CONDAPATH}/bin/activate ${CONDAENV}

            # Configure Databricks CLI for deployment
            echo "${DBURL}
            $TOKEN" | databricks configure --token

            # Configure Databricks Connect for testing
            echo "${DBURL}
            $TOKEN
            ${CLUSTERID}
            0
            15001" | databricks-connect configure
           """
      }
  }
  stage('Checkout') { // for display purposes
    echo "Pulling ${CURRENTRELEASE} Branch from Github"
    git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
  }
  stage('Run Unit Tests') {
    try {
        sh """#!/bin/bash

              # Enable Conda environment for tests
              source ${CONDAPATH}/bin/activate ${CONDAENV}

              # Python tests for libs
              python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
           """
    } catch(err) {
      step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
      if (currentBuild.result == 'UNSTABLE')
        currentBuild.result = 'FAILURE'
      throw err
    }
  }
  stage('Package') {
    sh """#!/bin/bash

          # Enable Conda environment for tests
          source ${CONDAPATH}/bin/activate ${CONDAENV}

          # Package Python library to wheel
          cd ${LIBRARYPATH}/python/dbxdemo
          python3 setup.py sdist bdist_wheel
       """
  }
  stage('Build Artifact') {
    sh """mkdir -p ${BUILDPATH}/Workspace
          mkdir -p ${BUILDPATH}/Libraries/python
          mkdir -p ${BUILDPATH}/Validation/Output
          #Get modified files
          git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}

          # Get packaged libs
          find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/

          # Generate artifact
          tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
       """
    archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
  }
  stage('Deploy') {
    sh """#!/bin/bash
          # Enable Conda environment for tests
          source ${CONDAPATH}/bin/activate ${CONDAENV}

          # Use Databricks CLI to deploy notebooks
          databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}

          dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
       """
    withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """#!/bin/bash

              #Get space delimited list of libraries
              LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")

              #Script to uninstall, reboot if needed & instsall library
              python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
                        --token=$TOKEN\
                        --clusterid=${CLUSTERID}\
                        --libs=\$LIBS\
                        --dbfspath=${DBFSPATH}
           """
    }
  }
  stage('Run Integration Tests') {
    withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
        sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
                        --token=$TOKEN\
                        --clusterid=${CLUSTERID}\
                        --localpath=${NOTEBOOKPATH}/VALIDATION\
                        --workspacepath=${WORKSPACEPATH}/VALIDATION\
                        --outfilepath=${OUTFILEPATH}
           """
    }
    sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
          python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
       """
  }
  stage('Report Test Results') {
    sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
          touch ${TESTRESULTPATH}/TEST-*.xml
       """
    junit "**/reports/junit/*.xml"
  }
}

The remainder of this article discusses each step in the Pipeline.

Define environment variables

You can define environment variables to allow the Pipeline stages to be used in different Pipelines.

  • GITREPO: local path to the git repository root
  • GITREPOREMOTE : Web URL of git repository
  • GITHUBCREDID: Jenkins credential ID for the GitHub personal access token
  • CURRENTRELEASE : deployment branch
  • DBTOKEN: Jenkins credential ID for the Databricks personal access token
  • DBURL: Web URL for Databricks workspace
  • SCRIPTPATH: local path to the git project directory for automation scripts
  • NOTEBOOKPATH: local path to the git project directory for notebooks
  • LIBRARYPATH: local path to the git project directory for library code or other DBFS code
  • BUILDPATH: local path to the directory for build artifacts
  • OUTFILEPATH: local path to the JSON result files generated from automated tests
  • TESTRESULTPATH: local path to the directory for Junit test result summaries
  • WORKSPACEPATH: Databricks workspace path for notebooks
  • DBFSPATH: Databricks DBFS path for libraries and non-notebook code
  • CLUSTERID: Databricks cluster ID to run tests
  • CONDAPATH: path to Conda installation
  • CONDAENV: name for Conda environment containing build dependency libraries``

Set up the Pipeline

In the Setup stage you configure Databricks CLI and Databricks Connect with connection information.

def GITREPO         = "/var/lib/jenkins/workspace/${env.JOB_NAME}"
def GITREPOREMOTE   = "https://github.com/<repo>"
def GITHUBCREDID    = "<github-token>"
def CURRENTRELEASE  = "<release>"
def DBTOKEN         = "<databricks-token>"
def DBURL           = "https://<databricks-instance>"
def SCRIPTPATH      = "${GITREPO}/Automation/Deployments"
def NOTEBOOKPATH    = "${GITREPO}/Workspace"
def LIBRARYPATH     = "${GITREPO}/Libraries"
def BUILDPATH       = "${GITREPO}/Builds/${env.JOB_NAME}-${env.BUILD_NUMBER}"
def OUTFILEPATH     = "${BUILDPATH}/Validation/Output"
def TESTRESULTPATH  = "${BUILDPATH}/Validation/reports/junit"
def WORKSPACEPATH   = "/Shared/<path>"
def DBFSPATH        = "dbfs:<dbfs-path>"
def CLUSTERID       = "<cluster-id>"
def CONDAPATH       = "<conda-path>"
def CONDAENV        = "<conda-env>"


stage('Setup') {
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
  sh """#!/bin/bash
      # Configure Conda environment for deployment & testing
      source ${CONDAPATH}/bin/activate ${CONDAENV}

      # Configure Databricks CLI for deployment
      echo "${DBURL}
      $TOKEN" | databricks configure --token

      # Configure Databricks Connect for testing
      echo "${DBURL}
      $TOKEN
      ${CLUSTERID}
      0
      15001" | databricks-connect configure
     """
  }
}

Get the latest changes

The Checkout stage downloads code from the designated branch to the agent execution agent using a Jenkins plugin:

stage('Checkout') { // for display purposes
  echo "Pulling ${CURRENTRELEASE} Branch from Github"
  git branch: CURRENTRELEASE, credentialsId: GITHUBCREDID, url: GITREPOREMOTE
}

Develop unit tests

There are a few different options when deciding how to unit test your code. For library code developed outside a Databricks notebook, the process is like traditional software development practices. You write a unit test using a testing framework, like the Python pytest module, and JUnit-formatted XML files store the test results.

The Databricks process differs in that the code being tested is Apache Spark code intended to be executed on a Spark cluster often running locally or in this case on Databricks. To accommodate this requirement, you use Databricks Connect. Since the SDK was configured earlier, no changes to the test code are required to execute the tests on Databricks clusters. You installed Databricks Connect in a Conda virtual environment. Once the Conda environment is activated, the tests are executed using the Python tool, pytest, to which you provide the locations for the tests and the resulting output files.

Test library code using Databricks Connect

stage('Run Unit Tests') {
  try {
      sh """#!/bin/bash
         # Enable Conda environment for tests
         source ${CONDAPATH}/bin/activate ${CONDAENV}

         # Python tests for libs
         python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-libout.xml ${LIBRARYPATH}/python/dbxdemo/test*.py || true
         """
  } catch(err) {
    step([$class: 'JUnitResultArchiver', testResults: '--junit-xml=${TESTRESULTPATH}/TEST-*.xml'])
    if (currentBuild.result == 'UNSTABLE')
      currentBuild.result = 'FAILURE'
    throw err
  }
}

The following snippet is a library function that might be installed on a Databricks cluster. It is a simple function that adds a new column, populated by a literal, to an Apache Spark DataFrame.

# addcol.py
import pyspark.sql.functions as F

def with_status(df):
    return df.withColumn("status", F.lit("checked"))

This test passes a mock DataFrame object to the with_status function, defined in addcol.py. The result is then compared to a DataFrame object containing the expected values. If the values match, which in this case they will, the test passes.

# test-addcol.py
import pytest


from dbxdemo.spark import get_spark
from dbxdemo.appendcol import with_status


class TestAppendCol(object):

  def test_with_status(self):
    source_data = [
        ("paula", "white", "paula.white@example.com"),
        ("john", "baer", "john.baer@example.com")
    ]
    source_df = get_spark().createDataFrame(
        source_data,
        ["first_name", "last_name", "email"]
    )

    actual_df = with_status(source_df)

    expected_data = [
        ("paula", "white", "paula.white@example.com", "checked"),
        ("john", "baer", "john.baer@example.com", "checked")
    ]
    expected_df = get_spark().createDataFrame(
        expected_data,
        ["first_name", "last_name", "email", "status"]
    )

    assert(expected_df.collect() == actual_df.collect())

Package library code

In the Package stage you package the library code into a Python wheel.

stage('Package') {
  sh """#!/bin/bash
      # Enable Conda environment for tests
      source ${CONDAPATH}/bin/activate ${CONDAENV}

      # Package Python library to wheel
      cd ${LIBRARYPATH}/python/dbxdemo
      python3 setup.py sdist bdist_wheel
     """
}

Generate and store a deployment artifact

Building a deployment artifact for Databricks involves gathering all the new or updated code to be deployed to the appropriate Databricks environment. In the Build Artifact stage you add the notebook code to be deployed to the workspace, any whl libraries that were generated by the build process, as well as the result summaries for the tests, for archiving purposes. To do this you use git diff to flag all new files that have been included in the most recent git merge. This is only an example method, so the implementation in your Pipeline may differ, but the objective is to add all files intended for the current release.

stage('Build Artifact') {
  sh """mkdir -p ${BUILDPATH}/Workspace
        mkdir -p ${BUILDPATH}/Libraries/python
        mkdir -p ${BUILDPATH}/Validation/Output
        #Get Modified Files
        git diff --name-only --diff-filter=AMR HEAD^1 HEAD | xargs -I '{}' cp --parents -r '{}' ${BUILDPATH}

        # Get packaged libs
        find ${LIBRARYPATH} -name '*.whl' | xargs -I '{}' cp '{}' ${BUILDPATH}/Libraries/python/

        # Generate artifact
        tar -czvf Builds/latest_build.tar.gz ${BUILDPATH}
     """
  archiveArtifacts artifacts: 'Builds/latest_build.tar.gz'
}

Deploy artifacts

In the Deploy stage you use the Databricks CLI, which, like the Databricks Connect module used earlier, is installed in your Conda environment, so you must activate it for this shell session. You use the Workspace CLI and DBFS CLI to upload the notebooks and libraries, respectively:

databricks workspace import_dir <local build path> <remote workspace path>
dbfs cp -r <local build path> <remote dbfs path>
stage('Deploy') {
  sh """#!/bin/bash
        # Enable Conda environment for tests
        source ${CONDAPATH}/bin/activate ${CONDAENV}

        # Use Databricks CLI to deploy notebooks
        databricks workspace import_dir ${BUILDPATH}/Workspace ${WORKSPACEPATH}

        dbfs cp -r ${BUILDPATH}/Libraries/python ${DBFSPATH}
     """
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
    sh """#!/bin/bash

        #Get space delimited list of libraries
        LIBS=\$(find ${BUILDPATH}/Libraries/python/ -name '*.whl' | sed 's#.*/##' | paste -sd " ")

        #Script to uninstall, reboot if needed & instsall library
        python3 ${SCRIPTPATH}/installWhlLibrary.py --workspace=${DBURL}\
                  --token=$TOKEN\
                  --clusterid=${CLUSTERID}\
                  --libs=\$LIBS\
                  --dbfspath=${DBFSPATH}
       """
  }
}

Installing a new version of a library on a Databricks cluster requires that you first uninstall the existing library. To do this, you invoke the Databricks REST API in a Python script to perform the following steps:

  1. Check if the library is installed.
  2. Uninstall the library.
  3. Restart the cluster if any uninstalls were performed.
    1. Wait until the cluster is running again before proceeding.
  4. Install the library.
# installWhlLibrary.py
#!/usr/bin/python3
import json
import requests
import sys
import getopt
import time

def main():
  workspace = ''
  token = ''
  clusterid = ''
  libs = ''
  dbfspath = ''

  try:
      opts, args = getopt.getopt(sys.argv[1:], 'hstcld',
                                 ['workspace=', 'token=', 'clusterid=', 'libs=', 'dbfspath='])
  except getopt.GetoptError:
      print(
          'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
      sys.exit(2)

  for opt, arg in opts:
      if opt == '-h':
          print(
              'installWhlLibrary.py -s <workspace> -t <token> -c <clusterid> -l <libs> -d <dbfspath>')
          sys.exit()
      elif opt in ('-s', '--workspace'):
          workspace = arg
      elif opt in ('-t', '--token'):
          token = arg
      elif opt in ('-c', '--clusterid'):
          clusterid = arg
      elif opt in ('-l', '--libs'):
          libs=arg
      elif opt in ('-d', '--dbfspath'):
          dbfspath=arg

  print('-s is ' + workspace)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + libs)
  print('-d is ' + dbfspath)

  libslist = libs.split()

  # Uninstall Library if exists on cluster
  i=0
  for lib in libslist:
      dbfslib = dbfspath + lib
      print(dbfslib + ' before:' + getLibStatus(workspace, token, clusterid, dbfslib))

      if (getLibStatus(workspace, token, clusterid, dbfslib) != 'not found'):
          print(dbfslib + " exists. Uninstalling.")
          i = i + 1
          values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}

          resp = requests.post(workspace + '/api/2.0/libraries/uninstall', data=json.dumps(values), auth=("token", token))
          runjson = resp.text
          d = json.loads(runjson)
          print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))

  # Restart if libraries uninstalled
  if i > 0:
      values = {'cluster_id': clusterid}
      print("Restarting cluster:" + clusterid)
      resp = requests.post(workspace + '/api/2.0/clusters/restart', data=json.dumps(values), auth=("token", token))
      restartjson = resp.text
      print(restartjson)

      p = 0
      waiting = True
      while waiting:
          time.sleep(30)
          clusterresp = requests.get(workspace + '/api/2.0/clusters/get?cluster_id=' + clusterid,
                                 auth=("token", token))
          clusterjson = clusterresp.text
          jsonout = json.loads(clusterjson)
          current_state = jsonout['state']
          print(clusterid + " state:" + current_state)
          if current_state in ['RUNNING','INTERNAL_ERROR', 'SKIPPED'] or p >= 10:
              break
          p = p + 1

  # Install Libraries
  for lib in libslist:
      dbfslib = dbfspath + lib
      print("Installing " + dbfslib)
      values = {'cluster_id': clusterid, 'libraries': [{'whl': dbfslib}]}

      resp = requests.post(workspace + '/api/2.0/libraries/install', data=json.dumps(values), auth=("token", token))
      runjson = resp.text
      d = json.loads(runjson)
      print(dbfslib + ' after:' + getLibStatus(workspace, token, clusterid, dbfslib))


def getLibStatus(workspace, token, clusterid, dbfslib):
  resp = requests.get(workspace + '/api/2.0/libraries/cluster-status?cluster_id='+ clusterid, auth=("token", token))
  libjson = resp.text
  d = json.loads(libjson)
  if (d.get('library_statuses')):
      statuses = d['library_statuses']

      for status in statuses:
          if (status['library'].get('whl')):
              if (status['library']['whl'] == dbfslib):
                  return status['status']
              else:
                  return "not found"
  else:
      # No libraries found
      return "not found"

if __name__ == '__main__':
  main()

Test notebook code using another notebook

Once the artifact has been deployed, it is important to run integration tests to ensure all the code is working together in the new environment. To do this you can run a notebook containing asserts to test the deployment. In this case you are using the same test you used in the unit test, but now it is importing the installed appendcol library from the whl that was just installed on the cluster.

To automate this test and include it in your CI/CD Pipeline, use the Databricks REST API to run the notebook from the Jenkins server. This allows you to check whether the notebook run passed or failed using pytest. If the asserts in the notebook fail, this will be shown in the JSON output returned by the REST API and subsequently in the JUnit test results.

stage('Run Integration Tests') {
  withCredentials([string(credentialsId: DBTOKEN, variable: 'TOKEN')]) {
      sh """python3 ${SCRIPTPATH}/executenotebook.py --workspace=${DBURL}\
                      --token=$TOKEN\
                      --clusterid=${CLUSTERID}\
                      --localpath=${NOTEBOOKPATH}/VALIDATION\
                      --workspacepath=${WORKSPACEPATH}/VALIDATION\
                      --outfilepath=${OUTFILEPATH}
         """
  }
  sh """sed -i -e 's #ENV# ${OUTFILEPATH} g' ${SCRIPTPATH}/evaluatenotebookruns.py
        python3 -m pytest --junit-xml=${TESTRESULTPATH}/TEST-notebookout.xml ${SCRIPTPATH}/evaluatenotebookruns.py || true
     """
}

This stage calls two Python automation scripts. The first script, executenotebook.py, runs the notebook using the Jobs RunsSubmit endpoint which submits an anonymous job. Since this endpoint is asynchronous, it uses the job ID initially returned by the REST call to poll for the status of the job. Once the job has completed, the JSON output is saved to the path specified by the function arguments passed at invocation.

# executenotebook.py
#!/usr/bin/python3
import json
import requests
import os
import sys
import getopt
import time


def main():
  workspace = ''
  token = ''
  clusterid = ''
  localpath = ''
  workspacepath = ''
  outfilepath = ''

  try:
      opts, args = getopt.getopt(sys.argv[1:], 'hs:t:c:lwo',
                                 ['workspace=', 'token=', 'clusterid=', 'localpath=', 'workspacepath=', 'outfilepath='])
  except getopt.GetoptError:
      print(
          'executenotebook.py -s <workspace> -t <token>  -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>)')
      sys.exit(2)

  for opt, arg in opts:
      if opt == '-h':
          print(
              'executenotebook.py -s <workspace> -t <token> -c <clusterid> -l <localpath> -w <workspacepath> -o <outfilepath>')
          sys.exit()
      elif opt in ('-s', '--workspace'):
          workspace = arg
      elif opt in ('-t', '--token'):
          token = arg
      elif opt in ('-c', '--clusterid'):
          clusterid = arg
      elif opt in ('-l', '--localpath'):
          localpath = arg
      elif opt in ('-w', '--workspacepath'):
          workspacepath = arg
      elif opt in ('-o', '--outfilepath'):
          outfilepath = arg

  print('-s is ' + workspace)
  print('-t is ' + token)
  print('-c is ' + clusterid)
  print('-l is ' + localpath)
  print('-w is ' + workspacepath)
  print('-o is ' + outfilepath)
  # Generate array from walking local path

  notebooks = []
  for path, subdirs, files in os.walk(localpath):
      for name in files:
          fullpath = path + '/' + name
          # removes localpath to repo but keeps workspace path
          fullworkspacepath = workspacepath + path.replace(localpath, '')

          name, file_extension = os.path.splitext(fullpath)
          if file_extension.lower() in ['.scala', '.sql', '.r', '.py']:
              row = [fullpath, fullworkspacepath, 1]
              notebooks.append(row)

  # run each element in list
  for notebook in notebooks:
      nameonly = os.path.basename(notebook[0])
      workspacepath = notebook[1]

      name, file_extension = os.path.splitext(nameonly)

      # workpath removes extension
      fullworkspacepath = workspacepath + '/' + name

      print('Running job for:' + fullworkspacepath)
      values = {'run_name': name, 'existing_cluster_id': clusterid, 'timeout_seconds': 3600, 'notebook_task': {'notebook_path': fullworkspacepath}}

      resp = requests.post(workspace + '/api/2.0/jobs/runs/submit',
                           data=json.dumps(values), auth=("token", token))
      runjson = resp.text
      print("runjson:" + runjson)
      d = json.loads(runjson)
      runid = d['run_id']

      i=0
      waiting = True
      while waiting:
          time.sleep(10)
          jobresp = requests.get(workspace + '/api/2.0/jobs/runs/get?run_id='+str(runid),
                           data=json.dumps(values), auth=("token", token))
          jobjson = jobresp.text
          print("jobjson:" + jobjson)
          j = json.loads(jobjson)
          current_state = j['state']['life_cycle_state']
          runid = j['run_id']
          if current_state in ['TERMINATED', 'INTERNAL_ERROR', 'SKIPPED'] or i >= 12:
              break
          i=i+1

      if outfilepath != '':
          file = open(outfilepath + '/' +  str(runid) + '.json', 'w')
          file.write(json.dumps(j))
          file.close()

if __name__ == '__main__':
  main()

The second script, evaluatenotebookruns.py, defines the test_job_run function, which parses and evaluates the JSON to determine if the assert statements within the notebook passed or failed. An additional test, test_performance, catches tests that run longer than expected.

# evaluatenotebookruns.py
import unittest
import json
import glob
import os

class TestJobOutput(unittest.TestCase):

  test_output_path = '#ENV#'

  def test_performance(self):
      path = self.test_output_path
      statuses = []

      for filename in glob.glob(os.path.join(path, '*.json')):
          print('Evaluating: ' + filename)
          data = json.load(open(filename))
          duration = data['execution_duration']
          if duration > 100000:
              status = 'FAILED'
          else:
              status = 'SUCCESS'

          statuses.append(status)

      self.assertFalse('FAILED' in statuses)


  def test_job_run(self):
      path = self.test_output_path
      statuses = []


      for filename in glob.glob(os.path.join(path, '*.json')):
          print('Evaluating: ' + filename)
          data = json.load(open(filename))
          status = data['state']['result_state']
          statuses.append(status)

      self.assertFalse('FAILED' in statuses)

if __name__ == '__main__':
  unittest.main()

As seen earlier in the unit test stage, you use pytest to run the tests and generate the result summaries.

Publish test results

The JSON results are archived and the test results are published to Jenkins using the junit Jenkins plugin. This enables you to visualize reports and dashboards related to the status of the build process.

stage('Report Test Results') {
  sh """find ${OUTFILEPATH} -name '*.json' -exec gzip --verbose {} \\;
        touch ${TESTRESULTPATH}/TEST-*.xml
     """
  junit "**/reports/junit/*.xml"
}
Jenkins test results

At this point, the CI/CD pipeline has completed an integration and deployment cycle. By automating this process, you can ensure that your code has been tested and deployed by an efficient, consistent, and repeatable process.