Ingira relatórios do Workday
Esta página descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace deve estar habilitado para o Unity Catalog.
-
O compute sem servidor deve estar habilitado para o seu workspace. Consulte Ativar serverless compute .
-
Se você planeja criar uma nova conexão: você deve ter privilégios
CREATE CONNECTION
na metastore.Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.
-
Se você planeja usar uma conexão existente: você deve ter privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. -
Você deve ter privilégios
USE CATALOG
no catálogo de destino. -
Você deve ter privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para ingerir do Workday, você deve concluir a configuração da fonte.
Configurar a rede
Se o controle de saída doserverless estiver ativado, coloque na lista de permissões os nomes de host dos URLs do seu relatório. Por exemplo, o URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json
do relatório tem o nome de host https://ww1.workday.com
. Consulte gerenciar políticas de rede para serverless controle de saída.
Opção 1: UI da Databricks
-
Na barra lateral do site Databricks workspace, clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em Workday Reports .
O assistente de ingestão é aberto.
-
Na página Ingestion pipeline (Pipeline de ingestão ) do assistente, digite um nome exclusivo para o pipeline.
-
Para o local do evento log , selecione um catálogo e um esquema para armazenar o evento pipeline log.
-
Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.
Se não houver conexões existentes com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da fonte. Você deve ter privilégios
CREATE CONNECTION
na metastore. -
Clique em Create pipeline (Criar pipeline) e continue .
-
Na página Relatório , clique em Adicionar relatório e insira o URL do relatório. Repita o procedimento para cada relatório que você deseja consumir e clique em Avançar .
-
Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.
Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOG
eCREATE SCHEMA
no catálogo principal. -
Clique em Save pipeline (Salvar pipeline) e continue .
-
(Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.
-
(Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.
-
Clique em Save e execute pipeline .
Opção 2: Databricks ativo Bundles
Esta seção descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
-
Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/workday_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/workday_job.yml
).
Veja a seguir um exemplo de arquivo
resources/workday_pipeline.yml
:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_ID
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>Veja a seguir um exemplo de arquivo
resources/workday_job.yml
:YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
Opção 3: Databricks Notebook
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
-
Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.
-
Gerar tokens de acesso pessoal.
-
Cole o código a seguir em uma célula do Python Notebook, modificando o valor
<personal-access-token>
:Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
Cole o código a seguir em uma segunda célula do Notebook:
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column_a>", "<column_b>", "<column_c>"]
}
}
}
]
}
}
"""
create_pipeline(pipeline_spec) -
executar a primeira célula do Notebook com seus tokens de acesso pessoal.
-
executar a segunda célula do Notebook.
-
Execute a terceira célula do Notebook com seus detalhes pipeline. Esta execução
create_pipeline
.list_pipeline
retorna a ID do pipeline e seus detalhes.edit_pipeline
permite que o senhor edite a definição do pipeline.delete_pipeline
exclui o pipeline.
Opção 4: Databricks CLI
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
include_columns
: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.exclude_columns
: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url
), o que permite que você ingira relatórios filtrados.
- Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.
- Execute o seguinte comando para criar o pipeline:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"
definição de pipeline padrão
A seguir, é apresentada uma definição do padrão JSON pipeline :
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.
Exemplo: ingerir dois relatórios do Workday em esquemas separados
O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>