Create a Jira ingestion pipeline
The Jira connector is in Beta.
This page describes how to create a Jira ingestion pipeline using Databricks Lakeflow Connect. You can ingest Jira data using either a notebook, Databricks Asset Bundles, or the Databricks CLI.
Before you begin
To create an ingestion pipeline, you must meet the following requirements:
-
Your workspace must be enabled for Unity Catalog.
-
Serverless compute must be enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a new connection: You must have
CREATE CONNECTIONprivileges on the metastore.If the connector supports UI-based pipeline authoring, an admin can create the connection and the pipeline at the same time by completing the steps on this page. However, if the users who create pipelines use API-based pipeline authoring or are non-admin users, an admin must first create the connection in Catalog Explorer. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You must have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection object. -
You must have
USE CATALOGprivileges on the target catalog. -
You must have
USE SCHEMAandCREATE TABLEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
To configure Jira for ingestion, see Configure Jira for ingestion.
Create an ingestion pipeline
Permissions required: USE CONNECTION or ALL PRIVILEGES on a connection.
You can ingest Jira data using either a notebook, Databricks Asset Bundles, or the Databricks CLI. Each table you specify is ingested into a streaming table or a snapshot table, depending on the source. See Jira connector reference for the full list of objects that are available to ingest.
- Databricks notebook
- Databricks Asset Bundles
- Databricks CLI
-
Copy and paste the following code into a notebook cell and run the code. Do not modify any of this code.
Python# DO NOT MODIFY
# This sets up the API utils for creating managed ingestion pipelines in Databricks.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
def stop_pipeline(id: str):
print("cannot stop pipeline") -
Modify the following pipeline specifications template to fit your ingestion needs. Then, run the cell and an ingestion pipeline has been created. You can view the pipeline in the Jobs & Pipelines section in your workspace.
You can also optionally filter the data by Jira spaces or projects. Ensure you are using exact project keys instead of project names or IDs.
(Recommended) The following is an example of ingesting a single source table. See Jira connector reference for a full list of source tables you can ingest.
Python# Example of ingesting a single table
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)(Recommended) The following is an example of ingesting multiple source tables. See Jira connector reference for a full list of source tables you can ingest.
Python# Example of ingesting multiple tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
},
{
"table": {
"source_schema": "default",
"source_table": "projects",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_projects",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)The following is an example of ingesting all available Jira source tables in one pipeline. Ensure that your OAuth application includes all scopes required by the full table set and that the authenticating user has the necessary Jira permissions. Pipelines fail if any required scope or permission is missing.
Python# Example of ingesting all source tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "default",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see What are Databricks Asset Bundles?.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add two new resource files to the bundle:
- A pipeline definition file (
resources/jira_pipeline.yml). - A workflow file that controls the frequency of data ingestion (
resources/jira_job.yml).
The following is an example
resources/jira_pipeline.ymlfile:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for jira_dab
resources:
pipelines:
pipeline_jira:
name: jira_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <jira-connection>
objects:
# An array of objects to ingest from Jira. This example
# ingests the issues, projects, and status objects.
- table:
source_schema: objects
source_table: issues
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: projects
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: status
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}The following is an example
resources/jira_job.ymlfile:YAMLresources:
jobs:
jira_dab_job:
name: jira_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_jira.id} - A pipeline definition file (
To create the pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
To update the pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
To get the pipeline definition:
databricks pipelines get "<pipeline-id>"
To delete the pipeline:
databricks pipelines delete "<pipeline-id>"
For more information, you can run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help