databricks-logo

Create a Google Analytics ingestion pipeline

(Python)
Loading...
1
# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<Paste generated token here>"
2
# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json


notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"


headers = {
   'Authorization': 'Bearer {}'.format(api_token),
   'Content-Type': 'application/json'
}


def check_response(response):
   if response.status_code == 200:
       print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
   else:
       print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")


# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
 response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
 check_response(response)


def edit_pipeline(id: str, pipeline_definition: str):
 response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
 check_response(response)


def delete_pipeline(id: str):
 response = requests.delete(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def get_pipeline(id: str):
 response = requests.get(url=f"{api_url}/{id}", headers=headers)
 check_response(response)


def list_pipeline(filter: str = ""):
 body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
 response = requests.get(url=api_url, headers=headers, data=body)
 check_response(response)
# OK to modify everything other than the channel.
# Update this notebook to configure your ingestion pipeline.


pipeline_spec = """
{
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
     "connection_name": "<YOUR_CONNECTON_NAME>",
     "objects": [
       {
         "table": {
           "source_catalog": "<YOUR_GCP_PROJECT_ID>",
           "source_schema": "<YOUR_ANALYTICS_PROPERTY_NAME>",
           "source_table": "<YOUR_FIRST_TABLE>",
           "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
           "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
           "table_configuration": {
               "scd_type": "SCD_TYPE_2"
            }
         }
       }, 
{
         "table": {
	     "source_catalog": "<YOUR_GCP_PROJECT_ID>",
           "source_schema": "<YOUR_ANALYTICS_PROPERTY_NAME>",
           "source_table": "YOUR_SECOND_TABLE",
           "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
           "destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
         }
       }
     ]
 }
}
"""


create_pipeline(pipeline_spec)
;