Pular para o conteúdo principal

Ingira relatórios do Workday

Esta página descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace deve estar habilitado para o Unity Catalog.

  • O compute sem servidor deve estar habilitado para o seu workspace. Consulte Ativar serverless compute .

  • Se você planeja criar uma nova conexão: você deve ter privilégios CREATE CONNECTION na metastore.

    Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.

  • Se você planeja usar uma conexão existente: você deve ter privilégios USE CONNECTION ou ALL PRIVILEGES no objeto de conexão.

  • Você deve ter privilégios USE CATALOG no catálogo de destino.

  • Você deve ter privilégios USE SCHEMA e CREATE TABLE em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

Para ingerir do Workday, você deve concluir a configuração da fonte.

Configurar a rede

Se o controle de saída doserverless estiver ativado, coloque na lista de permissões os nomes de host dos URLs do seu relatório. Por exemplo, o URL https://ww1.workday.com/service/ccx/<tenant>/<reportName>?format=json do relatório tem o nome de host https://ww1.workday.com. Consulte gerenciar políticas de rede para serverless controle de saída.

Opção 1: UI da Databricks

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em Workday Reports .

    O assistente de ingestão é aberto.

  3. Na página Ingestion pipeline (Pipeline de ingestão ) do assistente, digite um nome exclusivo para o pipeline.

  4. Para o local do evento log , selecione um catálogo e um esquema para armazenar o evento pipeline log.

  5. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.

    Se não houver conexões existentes com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da fonte. Você deve ter privilégios CREATE CONNECTION na metastore.

  6. Clique em Create pipeline (Criar pipeline) e continue .

  7. Na página Relatório , clique em Adicionar relatório e insira o URL do relatório. Repita o procedimento para cada relatório que você deseja consumir e clique em Avançar .

  8. Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.

    Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo principal.

  9. Clique em Save pipeline (Salvar pipeline) e continue .

  10. (Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.

  11. (Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.

  12. Clique em Save e execute pipeline .

Opção 2: Databricks ativo Bundles

Esta seção descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.

O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:

  • include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
  • exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.

Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.

  1. Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.

  2. Crie um novo pacote usando a CLI do Databricks:

    Bash
    databricks bundle init
  3. Adicione dois novos arquivos de recurso ao pacote:

    • Um arquivo de definição de pipeline (resources/workday_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (resources/workday_job.yml).

    Veja a seguir um exemplo de arquivo resources/workday_pipeline.yml:

    YAML
    variables:
    dest_catalog:
    default: main
    dest_schema:
    default: ingest_destination_schema

    # The main pipeline for workday_dab
    resources:
    pipelines:
    pipeline_workday:
    name: workday_pipeline
    catalog: ${var.dest_catalog}
    schema: ${var.dest_schema}
    ingestion_definition:
    connection_name: <workday-connection>
    objects:
    # An array of objects to ingest from Workday. This example
    # ingests a sample report about all active employees. The Employee_ID key is used as
    # the primary key for the report.
    - report:
    source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    destination_table: All_Active_Employees_Data
    table_configuration:
    primary_keys:
    - Employee_ID
    include_columns: # This can be exclude_columns instead
    - <column_a>
    - <column_b>
    - <column_c>

    Veja a seguir um exemplo de arquivo resources/workday_job.yml:

    YAML
    resources:
    jobs:
    workday_dab_job:
    name: workday_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_workday.id}
  4. implantado o pipeline usando o Databricks CLI:

    Bash
    databricks bundle deploy

Opção 3: Databricks Notebook

O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:

  • include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
  • exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.

Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.

  1. Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.

  2. Gerar tokens de acesso pessoal.

  3. Cole o código a seguir em uma célula do Python Notebook, modificando o valor <personal-access-token>:

    Python
    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
  4. Cole o código a seguir em uma segunda célula do Notebook:

    Python
    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json

    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"

    headers = {
    'Authorization': 'Bearer {}'.format(api_token),
    'Content-Type': 'application/json'
    }

    def check_response(response):
    if response.status_code == 200:
    print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
    else:
    print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)

    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)

    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)

    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
  5. Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:

    Python
    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.

    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
    "connection_name": "<YOUR_CONNECTON_NAME>",
    "objects": [
    {
    "report": {
    "source_url": "<YOUR_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"]
    }
    }
    }, {
    "report": {
    "source_url": "<YOUR_SECOND_REPORT_URL>",
    "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
    "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
    "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
    "table_configuration": {
    "primary_keys": ["<PRIMARY_KEY>"],
    "scd_type": "SCD_TYPE_2",
    "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
    }
    }
    }
    ]
    }
    }
    """

    create_pipeline(pipeline_spec)
  6. executar a primeira célula do Notebook com seus tokens de acesso pessoal.

  7. executar a segunda célula do Notebook.

  8. Execute a terceira célula do Notebook com seus detalhes pipeline. Esta execução create_pipeline.

    • list_pipeline retorna a ID do pipeline e seus detalhes.
    • edit_pipeline permite que o senhor edite a definição do pipeline.
    • delete_pipeline exclui o pipeline.

Opção 4: Databricks CLI

O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:

  • include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
  • exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.

Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.

  1. Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.
  2. Execute o seguinte comando para criar o pipeline:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"

definição de pipeline padrão

A seguir, é apresentada uma definição do padrão JSON pipeline :

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"report": {

"source_url": "<report-url>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"primary_keys": ["<primary-key>"],

"scd_type": "SCD_TYPE_2",

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

começar, programar e definir alertas em seu pipeline

O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .

  4. Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.

Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.

Exemplo: ingerir dois relatórios do Workday em esquemas separados

O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.

YAML
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>

Recurso adicional