Jira 取り込みパイプラインを作成する
ベータ版
Jira コネクタはベータ版です。
このページでは、 Databricks LakeFlow Connectを使用して Jira インジェスト パイプラインを作成する方法について説明します。 ノートブック、Databricks アセット バンドル、または Databricks CLI のいずれかを使用して Jira データを取り込むことができます。
始める前に
取り込み パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
ワークスペースでサーバレスコンピュートを有効にする必要があります。 「サーバレス コンピュート要件」を参照してください。
-
新しい接続を作成する場合: メタストアに対する
CREATE CONNECTION権限が必要です。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMAおよびCREATE TABLE権限、またはターゲット カタログに対するCREATE SCHEMA権限が必要です。
Jira を取り込み用に構成するには、 「Jira を取り込み用に構成する」を参照してください。
取り込みパイプラインを作成する
必要な権限: 接続時にUSE CONNECTIONまたはALL PRIVILEGES 。
ノートブック、Databricks アセット バンドル、または Databricks CLI のいずれかを使用して Jira データを取り込むことができます。指定した各テーブルは、ソースに応じてストリーミング テーブルまたはスナップショット テーブルに取り込まれます。 取り込み可能なオブジェクトの完全なリストについては、 Jira コネクタ リファレンスを参照してください。
- Databricks notebook
- Databricks Asset Bundles
- Databricks CLI
-
次のコードをコピーしてノートブックのセルに貼り付け、コードを実行します。このコードは変更しないでください。
Python# DO NOT MODIFY
# This sets up the API utils for creating managed ingestion pipelines in Databricks.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
def stop_pipeline(id: str):
print("cannot stop pipeline") -
取り込みのニーズに合わせて、次のパイプライン仕様テンプレートを変更します。次に、セルを実行すると、取り込みパイプラインが作成されます。 ワークスペースの「ジョブとパイプライン」 セクションでパイプラインを表示できます。
オプションで、Jira スペースまたはプロジェクト別にデータをフィルタリングすることもできます。プロジェクト名や ID ではなく、正確なプロジェクト キーを使用していることを確認してください。
(推奨) 以下は、単一のソース テーブルを取り込む例です。取り込むことができるソース テーブルの完全なリストについては、 Jira コネクタ リファレンスを参照してください。
Python# Example of ingesting a single table
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)(推奨) 複数のソース テーブルを取り込む例を次に示します。取り込むことができるソース テーブルの完全なリストについては、 Jira コネクタ リファレンスを参照してください。
Python# Example of ingesting multiple tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
},
{
"table": {
"source_schema": "default",
"source_table": "projects",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_projects",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)以下は、利用可能なすべての Jira ソース テーブルを 1 つのパイプラインに取り込む例です。OAuth アプリケーションに完全なテーブル セットに必要なすべてのスコープが含まれていること、および認証ユーザーに必要な Jira 権限があることを確認します。必要なスコープまたは権限が不足している場合、パイプラインは失敗します。
Python# Example of ingesting all source tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "default",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 Databricksアセット バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/jira_pipeline.yml)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/jira_job.yml)。
以下は
resources/jira_pipeline.ymlファイルの例です。YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for jira_dab
resources:
pipelines:
pipeline_jira:
name: jira_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <jira-connection>
objects:
# An array of objects to ingest from Jira. This example
# ingests the issues, projects, and status objects.
- table:
source_schema: objects
source_table: issues
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: projects
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: status
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}以下は
resources/jira_job.ymlファイルの例です。YAMLresources:
jobs:
jira_dab_job:
name: jira_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_jira.id} - パイプライン定義ファイル (
パイプラインを作成するには:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
パイプラインを更新するには:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
パイプライン定義を取得するには:
databricks pipelines get "<pipeline-id>"
パイプラインを削除するには:
databricks pipelines delete "<pipeline-id>"
詳細については、次のコマンドを実行してください。
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help