Ingira relatórios do Workday
Este artigo descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e DLT.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTION
na metastore.Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. -
Você tem privilégios
USE CATALOG
no catálogo de destino. -
Você tem privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para consumir do Workday, consulte Configurar relatórios do Workday para ingestão.
Crie uma conexão Workday
Permissões necessárias: CREATE CONNECTION
na metastore.
Para criar uma conexão Workday, faça o seguinte:
- Em seu site Databricks workspace, clique em Catalog > External locations > Connections > Create connection .
- Em Nome da conexão , insira um nome exclusivo para a conexão Workday.
- Em Tipo de conexão , selecione Relatórios de dias úteis .
- Para o tipo de autenticação , selecione OAuth refresh tokens e, em seguida, insira o ID do cliente , o segredo do cliente e os tokens de atualização que o senhor gerou durante a configuração da origem.
- Na página Criar conexão , clique em Criar .
Criar um pipeline de ingestão
Esta etapa descreve como configurar o pipeline de ingestão. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome (mas tudo em letras minúsculas) no destino, a menos que o senhor a tenha renomeado explicitamente.
- Databricks Asset Bundles
- Notebook
- CLI
Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/workday_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/workday_job.yml
).
Veja a seguir um exemplo de arquivo
resources/workday_pipeline.yml
:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_ID
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>Veja a seguir um exemplo de arquivo
resources/workday_job.yml
:YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
-
Gerar tokens de acesso pessoal.
-
Cole o código a seguir em uma célula do Python Notebook, modificando o valor
<personal-access-token>
:Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
Cole o código a seguir em uma segunda célula do Notebook:
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column_a>", "<column_b>", "<column_c>"]
}
}
}
]
}
}
"""
create_pipeline(pipeline_spec) -
executar a primeira célula do Notebook com seus tokens de acesso pessoal.
-
executar a segunda célula do Notebook.
-
Execute a terceira célula do Notebook com seus detalhes pipeline. Esta execução
create_pipeline
.list_pipeline
retorna a ID do pipeline e seus detalhes.edit_pipeline
permite que o senhor edite a definição do pipeline.delete_pipeline
exclui o pipeline.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
Para criar o pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
Para editar o pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
Para obter a definição do pipeline:
databricks pipelines get "<your_pipeline_id>"
Para excluir o pipeline:
databricks pipelines delete "<your_pipeline_id>"
Para obter mais informações, o senhor pode sempre executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Exemplo de definição de pipeline JSON:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Exemplo: ingerir dois relatórios do Workday em esquemas separados
O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>