Ingira relatórios do Workday
Esta página descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
- Seu workspace está habilitado para Unity Catalog.
- O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
- Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTION
na metastore. - Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. - Você tem privilégios
USE CATALOG
no catálogo de destino. - Você tem privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para consumir do Workday, consulte Configurar relatórios do Workday para ingestão.
Configurar a rede
Se o controle de saída doserverless estiver ativado, coloque na lista de permissões os nomes de host dos URLs do seu relatório. Por exemplo, o URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json
do relatório tem o nome de host https://ww1.workday.com
. Consulte Gerenciamento de políticas de rede para o controle de saída serverless.
Crie uma conexão Workday
Permissões necessárias: CREATE CONNECTION
na metastore.
Para criar uma conexão Workday, faça o seguinte:
- Em seu site Databricks workspace, clique em Catalog > External locations > Connections > Create connection .
- Em Nome da conexão , insira um nome exclusivo para a conexão Workday.
- Em Tipo de conexão , selecione Relatórios de dias úteis .
- Para o tipo de autenticação , selecione OAuth refresh tokens e, em seguida, insira o ID do cliente , o segredo do cliente e os tokens de atualização que o senhor gerou durante a configuração da origem.
- Na página Criar conexão , clique em Criar .
Criar um pipeline de ingestão
Esta etapa descreve como configurar o pipeline de ingestão. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome (mas tudo em letras minúsculas) no destino, a menos que o senhor a tenha renomeado explicitamente.
- Databricks Asset Bundles
- Notebook
- CLI
Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
Você pode especificar solicitações na URL do relatório (source_url
), o que permite ingerir relatórios filtrados.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/workday_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/workday_job.yml
).
Veja a seguir um exemplo de arquivo
resources/workday_pipeline.yml
:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_IDVeja a seguir um exemplo de arquivo
resources/workday_job.yml
:YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
-
Gerar tokens de acesso pessoal.
-
Cole o código a seguir em uma célula do Python Notebook, modificando o valor
<personal-access-token>
:Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
Cole o código a seguir em uma segunda célula do Notebook:
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column_a>", "<column_b>", "<column_c>"]
}
}
}
]
}
}
"""
create_pipeline(pipeline_spec) -
executar a primeira célula do Notebook com seus tokens de acesso pessoal.
-
executar a segunda célula do Notebook.
-
Execute a terceira célula do Notebook com seus detalhes pipeline. Esta execução
create_pipeline
.list_pipeline
retorna a ID do pipeline e seus detalhes.edit_pipeline
permite que o senhor edite a definição do pipeline.delete_pipeline
exclui o pipeline.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
Para criar o pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
Para editar o pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
Para obter a definição do pipeline:
databricks pipelines get "<your_pipeline_id>"
Para excluir o pipeline:
databricks pipelines delete "<your_pipeline_id>"
Para obter mais informações, o senhor pode sempre executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Exemplo de definição de pipeline JSON:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.
Exemplo: ingerir dois relatórios do Workday em esquemas separados
O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>