Create a Confluence ingestion pipeline
The Confluence connector is in Beta.
This page describes how to create a Confluence ingestion pipeline using Databricks Lakeflow Connect. The following interfaces are supported:
- Databricks Asset Bundles
- Databricks APIs
- Databricks SDKs
- Databricks CLI
Before you begin
To create the ingestion pipeline, you must meet the following requirements:
-
Your workspace must be enabled for Unity Catalog.
-
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object. -
You must have
USE CATALOGprivileges on the target catalog. -
You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
To ingest from Confluence, see Configure OAuth U2M for Confluence ingestion.
Create the ingestion pipeline
You must have USE CONNECTION or ALL PRIVILEGES on a connection to create an ingestion pipeline.
This step describes how to create the ingestion pipeline. Each ingested table is written to a streaming table with the same name.
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add two new resource files to the bundle:
- A pipeline definition file (
resources/confluence_pipeline.yml). - A workflow file that controls the frequency of data ingestion (
resources/confluence_job.yml).
The following is an example
resources/confluence_pipeline.ymlfile:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for confluence_dab
resources:
pipelines:
pipeline_confluence:
name: confluence_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: confluence_connection
objects:
- table:
source_schema: default
source_table: pages
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: <table-name>The following is an example
resources/confluence_job.ymlfile:YAMLresources:
jobs:
confluence_dab_job:
name: confluence_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_confluence.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
Cell 1
This cell initializes the environment, authenticates to the Databricks REST API, and defines a helper function to check API responses. Do not modify this cell.
import json
import requests
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
Cell 2
This cell defines functions to interact with the Pipelines API (create, edit, delete). Do not modify this cell.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
Cell 3
This cell creates an ingestion pipeline. Modify this cell with the details of your pipeline.
You can write to multiple destination catalogs or schemas. However, pipelines with multiple destinations won't support UI editing when it becomes available.
pipeline_name = "YOUR_PIPELINE_NAME"
connection_name = "YOUR_CONNECTION_NAME"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "pages",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "spaces",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "attachments",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "classification_levels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "labels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "blogposts",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
Run the following command:
databricks pipelines create --json "<pipeline definition or json file path>"
Pipeline definition template
Table spec values to modify:
name: A unique name for the pipeline.connection_name: The Unity Catalog connection that stores the authentication details for Confluence.source_schema:defaultsource_table:pages,spaces,labels,classification_levels,blogposts, orattachmentsdestination_catalog: A name for the destination catalog that will contain the ingested data.destination_schema: A name for the destination schema that will contain the ingested data.scd_type: The SCD method to use:SCD_TYPE_1orSCD_TYPE_2. The default is SCD type 1. For more information, see Enable history tracking (SCD type 2).
Table spec template:
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "<CONFLUENCE_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_1"
}
}
}
]
}
}
"""
Next steps
- Start, schedule, and set alerts on your pipeline.