databricks-logo

    Create a Microsoft SharePoint ingestion pipeline

    (Python)
    Loading...
    1
    # DO NOT MODIFY
    
    
    # This sets up the API utils for creating managed ingestion pipelines in Databricks.
    
    
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    api_token = notebook_context.apiToken().get()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
        'Authorization': 'Bearer {}'.format(api_token),
        'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
        if response.status_code == 200:
            print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
        else:
            print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    
    
    def create_pipeline(pipeline_definition: str):
      response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
      check_response(response)
    
    
    
    
    def edit_pipeline(id: str, pipeline_definition: str): 
      response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
      check_response(response)
    
    
    
    
    def delete_pipeline(id: str): 
      response = requests.delete(url=f"{api_url}/{id}", headers=headers)
      check_response(response)
    
    
    
    
    def list_pipeline(filter: str):
      body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
      response = requests.get(url=api_url, headers=headers, data=body)
      check_response(response)
    
    
    
    
    def get_pipeline(id: str):
      response = requests.get(url=f"{api_url}/{id}", headers=headers)
      check_response(response)
    
    
    
    
    def start_pipeline(id: str, full_refresh: bool=False):
      body = f"""
      {{
        "full_refresh": {str(full_refresh).lower()},
        "validate_only": false,
        "cause": "API_CALL"
      }}
      """
      response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
      check_response(response)
    
    
    
    
    def stop_pipeline(id: str):
      print("cannot stop pipeline")
    
    
    # Do not modify the PREVIEW channel in the code below. 
    
    
    # If you want to ingest all drives in your SharePoint site, use the schema spec. If you want to ingest only some drives in your SharePoint site, use the table spec.
    
    
    # By default, the API will use SCD type 1 for the data. This means that it will overwrite the file in the destination if it’s edited in the source. If you prefer to preserve historical file versions and use SCD type 2, then you should specify that in the config.
    
    
    # schema spec:
    
    
    pipeline_spec = """
    {
     "name": "<YOUR_PIPELINE_NAME>",
     "ingestion_definition": {
         "connection_name": "<YOUR_CONNECTON_NAME>",
         "objects": [
            {
              "schema": {
                "source_schema": "<YOUR_SHAREPOINT_SITE_ID>", 
                "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
                "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
                "table_configuration": {
                  "scd_type": "SCD_TYPE_1"
                }
              }
            }
          ]
     },
     "channel": "PREVIEW"
    }
    """
    
    
    
    
    create_pipeline(pipeline_spec)
    
    
    # Do not modify the PREVIEW channel in the code below. 
    
    
    # If you want to ingest all drives in your SharePoint site, use the schema spec. If you want to ingest only some drives in your SharePoint site, use the table spec.
    
    
    # By default, the API will use SCD type 1 for the data. This means that it will overwrite the file in the destination if it’s edited in the source. If you prefer to preserve historical file versions and use SCD type 2, then you should specify that in the config.
    
    
    # table spec:
    
    
    pipeline_spec = """
    {
     "name": "<YOUR_PIPELINE_NAME>",
     "ingestion_definition": {
         "connection_name": "<YOUR_CONNECTON_NAME>",
         "objects": [
            {
              "table": {
                "source_schema": "<YOUR_SHAREPOINT_SITE_ID>", 
                "source_table": "<YOUR_SHAREPOINT_DRIVE_NAME>",
                "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
                "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
                "destination_table": "<NAME"> # e.g., "my_drive",
                "table_configuration": {
                  "scd_type": "SCD_TYPE_1"
                }
              }
            }
          ]
     },
     "channel": "PREVIEW"
    }
    """
    
    
    
    
    create_pipeline(pipeline_spec)
    
    
    ;