databricks-logo

Create a ServiceNow ingestion pipeline

(Python)
Loading...
1
# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<Paste generated token here>"
2
# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json


notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"


headers = {
   'Authorization': 'Bearer {}'.format(api_token),
   'Content-Type': 'application/json'
}


def check_response(response):
   if response.status_code == 200:
       print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
   else:
       print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")


# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
 response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
 check_response(response)


def edit_pipeline(id: str, pipeline_definition: str):
 response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
 check_response(response)


def delete_pipeline(id: str):
 response = requests.delete(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def get_pipeline(id: str):
 response = requests.get(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def list_pipeline(filter: str = ""):
 body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
 response = requests.get(url=api_url, headers=headers, data=body)
 check_response(response)
# Update this notebook to configure your ingestion pipeline.


pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTON_NAME>",
     "objects": [
       {
         "table": {
           "source_schema": "default",
           "source_table": "<YOUR_SERVICENOW_TABLE>",
           "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
           "destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
         }
       }, {
         "table": {
           "source_schema": "default",
           "source_table": "<YOUR_SERVICENOW_TABLE_2>",
           "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
           "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
           "table_configuration": {
                "scd_type": "SCD_TYPE_2",
                "include_columns": ["<column-a>", "<column-b>", "<column-c>"] 
            }
         }
       }
     ]
 }
}
"""


create_pipeline(pipeline_spec)
;