Confluence取り込みパイプラインを作成する
プレビュー
Confluence コネクタはベータ版です。
このページでは、 Databricks LakeFlow Connectを使用して Confluence 取り込みパイプラインを作成する方法について説明します。 次のインターフェースがサポートされています。
- Databricksアセットバンドル
- Databricks API
- Databricks SDK
- Databricks CLI
始める前に
取り込みパイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
ワークスペースでサーバレスコンピュートを有効にする必要があります。 「サーバレス コンピュート要件」を参照してください。
-
新しい接続を作成する場合: メタストアに対する
CREATE CONNECTION権限が必要です。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMAおよびCREATE TABLE権限、またはターゲット カタログに対するCREATE SCHEMA権限が必要です。
Confluence から取り込むには、 「Confluence 取り込み用の OAuth U2M を構成する」を参照してください。
取り込みパイプラインを作成する
取り込みパイプラインを作成するには、接続にUSE CONNECTIONまたはALL PRIVILEGESが必要です。
このステップでは、インジェスト パイプラインの作成方法を説明します。 取り込まれた各テーブルは、同じ名前のストリーミング テーブルに書き込まれます。
- Databricks Asset Bundles
- Databricks notebook
- Databricks CLI
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/confluence_pipeline.yml)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/confluence_job.yml)。
以下は
resources/confluence_pipeline.ymlファイルの例です。YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for confluence_dab
resources:
pipelines:
pipeline_confluence:
name: confluence_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: confluence_connection
objects:
- table:
source_schema: default
source_table: pages
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: <table-name>以下は
resources/confluence_job.ymlファイルの例です。YAMLresources:
jobs:
confluence_dab_job:
name: confluence_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_confluence.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
セル1
このセルは環境を初期化し、Databricks REST API に対して認証し、API 応答を確認するためのヘルパー関数を定義します。このセルを変更しないでください。
import json
import requests
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
セル2
このセルは、パイプラインAPIと対話する関数 (作成、編集、削除) を定義します。 このセルを変更しないでください。
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
セル3
このセルは取り込みパイプラインを作成します。このセルをパイプラインの詳細に合わせて変更します。
複数の宛先カタログまたはスキーマに書き込むことができます。ただし、複数の宛先を持つパイプラインでは、UI 編集が利用可能になってもサポートされません。
pipeline_name = "YOUR_PIPELINE_NAME"
connection_name = "YOUR_CONNECTION_NAME"
pipeline_spec = {
"name": pipeline_name,
"ingestion_definition": {
"connection_name": connection_name,
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "pages",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "spaces",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "attachments",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "classification_levels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "labels",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
},
{
"table": {
"source_schema": "default",
"source_table": "blogposts",
"destination_catalog": "YOUR_DESTINATION_CATALOG_NAME",
"destination_schema": "YOUR_DESTINATION_SCHEMA_NAME",
"destination_table": "YOUR_DESTINATION_TABLE_NAME"
}
}
]
}
}
json_payload = json.dumps(pipeline_spec, indent=2)
create_pipeline(json_payload)
次のコマンドを実行します。
databricks pipelines create --json "<pipeline definition or json file path>"
パイプライン定義テンプレート
変更するテーブル仕様の値:
name: パイプラインの一意の名前。connection_name: Confluence の認証詳細を保存するUnity Catalog接続。source_schema:defaultsource_table:pages、spaces、labels、classification_levels、blogposts、またはattachmentsdestination_catalog: 取り込まれたデータを格納する宛先カタログの名前。destination_schema: 取り込まれたデータを格納する宛先スキーマの名前。scd_type: 使用する SCD メソッド:SCD_TYPE_1またはSCD_TYPE_2。デフォルトは SCD タイプ 1 です。詳細については、 「履歴追跡の有効化 ( SCDタイプ 2)」を参照してください。
テーブル仕様テンプレート:
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "<CONFLUENCE_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_1"
}
}
}
]
}
}
"""
次のステップ
- パイプラインを開始、スケジュールし、アラートを設定します。