Crie um pipeline de ingestão do Jira
Beta
O conector Jira está em versão Beta.
Esta página descreve como criar um pipeline de ingestão do Jira usando Databricks LakeFlow Connect. Você pode ingerir o uso de dados do Jira em um Notebook, pacotes ativos Databricks ou na CLI Databricks .
Antes de começar
Para criar um pipeline de ingestão, você deve atender aos seguintes requisitos:
-
Seu workspace deve estar habilitado para o Unity Catalog.
-
compute sem servidor (serverless compute) deve estar habilitado para seu workspace. Consulte os requisitos compute sem servidor.
-
Se você planeja criar uma nova conexão: Você deve ter privilégios
CREATE CONNECTIONno metastore.Se o conector suportar a criação pipeline baseada em interface de usuário, um administrador poderá criar a conexão e o pipeline simultaneamente, concluindo os passos desta página. No entanto, se os usuários que criam pipelines utilizarem a criação pipeline baseada em API ou não forem administradores, um administrador deverá primeiro criar a conexão no Catalog Explorer. Consulte Conectar para gerenciar fontes de ingestão.
-
Se você planeja usar uma conexão existente: Você deve ter privilégios
USE CONNECTIONouALL PRIVILEGESno objeto de conexão. -
Você deve ter privilégios
USE CATALOGno catálogo de destino. -
Você deve ter privilégios
USE SCHEMAeCREATE TABLEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino.
Para configurar o Jira para ingestão, consulte Configurar o Jira para ingestão.
Crie um pipeline de ingestão.
Permissões necessárias: USE CONNECTION ou ALL PRIVILEGES em uma conexão.
Você pode ingerir o uso de dados do Jira em um Notebook, pacotes ativos Databricks ou na CLI Databricks . Cada tabela que você especificar será inserida em uma tabela de transmissão ou em uma tabela de instantâneo, dependendo da origem. Consulte a referência do conector Jira para obter a lista completa de objetos disponíveis para ingestão.
- Databricks notebook
- Databricks Asset Bundles
- Databricks CLI
-
Copie e cole o seguinte código em uma célula do Notebook e execute-o. Não modifique nenhum trecho deste código.
Python# DO NOT MODIFY
# This sets up the API utils for creating managed ingestion pipelines in Databricks.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
def stop_pipeline(id: str):
print("cannot stop pipeline") -
Modifique as seguintes especificações pipeline padrão para atender às suas necessidades de ingestão. Em seguida, foi criada a célula de execução e um pipeline de ingestão. Você pode view o pipeline na seção "Tarefas e pipeline" em seu workspace.
Você também pode filtrar os dados opcionalmente por espaços ou projetos do Jira. Certifique-se de usar a chave exata do projeto em vez de nomes ou IDs de projeto.
(Recomendado) Segue um exemplo de ingestão de uma única tabela de origem. Consulte a referência do conector Jira para obter uma lista completa das tabelas de origem que você pode importar.
Python# Example of ingesting a single table
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)(Recomendado) Segue um exemplo de ingestão de múltiplas tabelas de origem. Consulte a referência do conector Jira para obter uma lista completa das tabelas de origem que você pode importar.
Python# Example of ingesting multiple tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "default",
"source_table": "issues",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_issues",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
},
{
"table": {
"source_schema": "default",
"source_table": "projects",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"destination_table": "jira_projects",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)Segue abaixo um exemplo de como ingerir todas as tabelas de origem do Jira disponíveis em um único pipeline. Certifique-se de que seu aplicativo OAuth inclua todos os escopos exigidos pelo conjunto completo de tabelas e que o usuário que realiza a autenticação tenha as permissões necessárias do Jira. O pipeline falha se faltar algum escopo ou permissão necessária.
Python# Example of ingesting all source tables
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "default",
"destination_catalog": "<YOUR_CATALOG>",
"destination_schema": "<YOUR_SCHEMA>",
"jira_options": {
"include_jira_spaces": ["key1", "key2"]
}
},
"scd_type": "SCD_TYPE_1"
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte O que são pacotes Databricks ativos?.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recursos ao pacote:
- Um arquivo de definição de pipeline (
resources/jira_pipeline.yml). - Um arquivo de fluxo de trabalho que controla a frequência de ingestão de dados (
resources/jira_job.yml).
Segue abaixo um exemplo de arquivo
resources/jira_pipeline.yml:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for jira_dab
resources:
pipelines:
pipeline_jira:
name: jira_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <jira-connection>
objects:
# An array of objects to ingest from Jira. This example
# ingests the issues, projects, and status objects.
- table:
source_schema: objects
source_table: issues
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: projects
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: status
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}Segue abaixo um exemplo de arquivo
resources/jira_job.yml:YAMLresources:
jobs:
jira_dab_job:
name: jira_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_jira.id} - Um arquivo de definição de pipeline (
Para criar o pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
Para atualizar o pipeline:
databricks pipelines update --json "<pipeline-definition | json-file-path>"
Para obter a definição do pipeline:
databricks pipelines get "<pipeline-id>"
Para excluir o pipeline:
databricks pipelines delete "<pipeline-id>"
Para mais informações, você pode executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Recursos adicionais
-
Padrões comuns: