Crie um pipeline de ingestão do Confluence.
Visualização
O conector do Confluence está em versão Beta.
Esta página descreve como criar um pipeline de ingestão do Confluence usando Databricks LakeFlow Connect. As seguintes interfaces são suportadas:
- Databricks Asset Bundles
- APIs do Databricks
- SDKs do Databricks
- CLI do Databricks
Antes de começar
Para criar o pipeline de ingestão, você deve atender aos seguintes requisitos:
-
Seu workspace deve estar habilitado para o Unity Catalog.
-
compute sem servidor (serverless compute) deve estar habilitado para seu workspace. Consulte os requisitos compute sem servidor.
-
Se você planeja criar uma nova conexão: Você deve ter privilégios
CREATE CONNECTIONno metastore.Se o conector suportar a criação pipeline baseada em interface de usuário, um administrador poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se os usuários que criam pipelines utilizarem a criação pipeline baseada em API ou não forem administradores, um administrador deverá primeiro criar a conexão no Catalog Explorer. Consulte Conectar para gerenciar fontes de ingestão.
-
Se você planeja usar uma conexão existente: Você deve ter privilégios
USE CONNECTIONouALL PRIVILEGESno objeto de conexão. -
Você deve ter privilégios
USE CATALOGno catálogo de destino. -
Você deve ter privilégios
USE SCHEMAeCREATE TABLEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino.
Para importar dados do Confluence, consulte Configurar OAuth U2M para importação de dados do Confluence.
Crie o pipeline de ingestão.
Você precisa ter USE CONNECTION ou ALL PRIVILEGES em uma conexão para criar um pipeline de ingestão.
Este passo descreve como criar o pipeline de ingestão. Cada tabela ingerida é gravada em uma tabela de transmissão com o mesmo nome.
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recursos ao pacote:
- Um arquivo de definição de pipeline (
resources/confluence_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/confluence_job.yml).
Segue abaixo um exemplo de arquivo
resources/confluence_pipeline.yml:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for confluence_dab
resources:
pipelines:
pipeline_confluence:
name: confluence_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: confluence_connection
objects:
- table:
source_schema: default
source_table: pages
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: <table-name>Segue abaixo um exemplo de arquivo
resources/confluence_job.yml:YAMLresources:
jobs:
confluence_dab_job:
name: confluence_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_confluence.id} - Um arquivo de definição de pipeline (
-
Implante o pipeline usando a CLI Databricks :
Bashdatabricks bundle deploy
Célula 1
Esta célula inicializa o ambiente, autentica-se na API REST do Databricks e define uma função auxiliar para verificar as respostas da API. Não modifique esta célula.
import json
import requests
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
Célula 2
Esta célula define funções para interagir com a API do pipeline (criar, editar, excluir). Não modifique esta célula.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
Célula 3
Esta célula cria um pipeline de ingestão. Modifique esta célula com os detalhes do seu pipeline.
Você pode gravar em vários catálogos ou esquemas de destino. No entanto, o pipeline com múltiplos destinos não suportará a edição da interface do usuário quando esta estiver disponível.
pipeline_name = "YOUR_PIPELINE_NAME"
connection_name = "YOUR_CONNECTION_NAME"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "pages",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "spaces",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "attachments",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "classification_levels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "labels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "blogposts",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
execute o seguinte comando:
databricks pipelines create --json "<pipeline definition or json file path>"
definição de dutos
Valores de especificação da tabela a serem modificados:
nameUm nome único para o pipeline.connection_nameA conexão do Unity Catalog que armazena os detalhes de autenticação para o Confluence.source_schema:defaultsource_table:pages,spaces,labels,classification_levels,blogposts, ouattachmentsdestination_catalog: Um nome para o catálogo de destino que conterá os dados ingeridos.destination_schema: Um nome para o esquema de destino que conterá os dados ingeridos.scd_type: O método SCD a ser usado:SCD_TYPE_1ouSCD_TYPE_2. O default é o tipo 1 SCD . Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
Especificação da mesa padrão:
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "<CONFLUENCE_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_1"
}
}
}
]
}
}
"""
Próximos passos
- começar, programar e definir alerta em seu pipeline.
Recursos adicionais
-
Padrões comuns: