Ingira relatórios do Workday
Esta página descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
- 
Seu workspace deve estar habilitado para o Unity Catalog. 
- 
O compute sem servidor deve estar habilitado para o seu workspace. Consulte os requisitos do compute sem servidor. 
- 
Se você planeja criar uma nova conexão: você deve ter privilégios CREATE CONNECTIONna metastore.Se o conector oferecer suporte à criação pipeline baseada em interface de usuário, um administrador poderá criar a conexão e o pipeline ao mesmo tempo, concluindo os passos nesta página. No entanto, se os usuários que criam o pipeline usarem a criação pipeline baseada em API ou não forem usuários administradores, um administrador deverá primeiro criar a conexão no Catalog Explorer. Veja Conectar às fontes de ingestão de gerenciar. 
- 
Se você planeja usar uma conexão existente: você deve ter privilégios USE CONNECTIONouALL PRIVILEGESno objeto de conexão.
- 
Você deve ter privilégios USE CATALOGno catálogo de destino.
- 
Você deve ter privilégios USE SCHEMAeCREATE TABLEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino.
Para ingerir do Workday, você deve concluir a configuração da fonte.
Opção 1: UI da Databricks
- 
Na barra lateral do site Databricks workspace, clique em ingestão de dados . 
- 
Na página Adicionar dados , em Conectores do Databricks , clique em Workday Reports . O assistente de ingestão é aberto. 
- 
Na página Ingestion pipeline (Pipeline de ingestão ) do assistente, digite um nome exclusivo para o pipeline. 
- 
Para o local do evento log , selecione um catálogo e um esquema para armazenar o evento pipeline log. 
- 
Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem. Se não houver conexões existentes com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da fonte. Você deve ter privilégios CREATE CONNECTIONna metastore.
- 
Clique em Create pipeline (Criar pipeline) e continue . 
- 
Na página Relatório , clique em Adicionar relatório e insira o URL do relatório. Repita o procedimento para cada relatório que você deseja consumir e clique em Avançar . 
- 
Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar. Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOGeCREATE SCHEMAno catálogo principal.
- 
Clique em Save pipeline (Salvar pipeline) e continue . 
- 
(Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino. 
- 
(Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline. 
- 
Clique em Save e execute pipeline . 
Opção 2: Databricks ativo Bundles
Esta seção descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
- include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
- exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.
- 
Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis. 
- 
Crie um novo pacote usando a CLI do Databricks: Bashdatabricks bundle init
- 
Adicione dois novos arquivos de recurso ao pacote: - Um arquivo de definição de pipeline (resources/workday_pipeline.yml).
- Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (resources/workday_job.yml).
 Veja a seguir um exemplo de arquivo resources/workday_pipeline.yml:YAMLvariables:
 dest_catalog:
 default: main
 dest_schema:
 default: ingest_destination_schema
 # The main pipeline for workday_dab
 resources:
 pipelines:
 pipeline_workday:
 name: workday_pipeline
 catalog: ${var.dest_catalog}
 schema: ${var.dest_schema}
 ingestion_definition:
 connection_name: <workday-connection>
 objects:
 # An array of objects to ingest from Workday. This example
 # ingests a sample report about all active employees. The Employee_ID key is used as
 # the primary key for the report.
 - report:
 source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
 destination_catalog: ${var.dest_catalog}
 destination_schema: ${var.dest_schema}
 destination_table: All_Active_Employees_Data
 table_configuration:
 primary_keys:
 - Employee_ID
 include_columns: # This can be exclude_columns instead
 - <column_a>
 - <column_b>
 - <column_c>Veja a seguir um exemplo de arquivo resources/workday_job.yml:YAMLresources:
 jobs:
 workday_dab_job:
 name: workday_dab_job
 trigger:
 # Run this job every day, exactly one day from the last run
 # See https://docs.databricks.com/api/workspace/jobs/create#trigger
 periodic:
 interval: 1
 unit: DAYS
 email_notifications:
 on_failure:
 - <email-address>
 tasks:
 - task_key: refresh_pipeline
 pipeline_task:
 pipeline_id: ${resources.pipelines.pipeline_workday.id}
- Um arquivo de definição de pipeline (
- 
implantado o pipeline usando o Databricks CLI: Bashdatabricks bundle deploy
Opção 3: Databricks Notebook
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
- include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
- exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.
- 
Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis. 
- 
Gerar tokens de acesso pessoal. 
- 
Cole o código a seguir em uma célula do Python Notebook, modificando o valor <personal-access-token>:Python# SHOULD MODIFY
 # This step sets up a PAT to make API calls to the Databricks service.
 api_token = "<personal-access-token>"
- 
Cole o código a seguir em uma segunda célula do Notebook: Python# DO NOT MODIFY
 # This step sets up a connection to make API calls to the Databricks service.
 import requests
 import json
 notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
 workspace_url = notebook_context.apiUrl().get()
 api_url = f"{workspace_url}/api/2.0/pipelines"
 headers = {
 'Authorization': 'Bearer {}'.format(api_token),
 'Content-Type': 'application/json'
 }
 def check_response(response):
 if response.status_code == 200:
 print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
 else:
 print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
 # DO NOT MODIFY
 # These are API definition to be used.
 def create_pipeline(pipeline_definition: str):
 response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
 check_response(response)
 def edit_pipeline(id: str, pipeline_definition: str):
 response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
 check_response(response)
 def delete_pipeline(id: str):
 response = requests.delete(url=f"{api_url}/{id}", headers=headers)
 check_response(response)
 def get_pipeline(id: str):
 response = requests.get(url=f"{api_url}/{id}", headers=headers)
 check_response(response)
 def list_pipeline(filter: str = ""):
 body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
 response = requests.get(url=api_url, headers=headers, data=body)
 check_response(response)
- 
Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline: Python# SHOULD MODIFY
 # Update this notebook to configure your ingestion pipeline.
 pipeline_spec = """
 {
 "name": "<YOUR_PIPELINE_NAME>",
 "ingestion_definition": {
 "connection_name": "<YOUR_CONNECTON_NAME>",
 "objects": [
 {
 "report": {
 "source_url": "<YOUR_REPORT_URL>",
 "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
 "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
 "destination_table": "<YOUR_DATABRICKS_TABLE>",
 "table_configuration": {
 "primary_keys": ["<PRIMARY_KEY>"]
 }
 }
 }, {
 "report": {
 "source_url": "<YOUR_SECOND_REPORT_URL>",
 "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
 "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
 "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
 "table_configuration": {
 "primary_keys": ["<PRIMARY_KEY>"],
 "scd_type": "SCD_TYPE_2",
 "include_columns": ["<column_a>", "<column_b>", "<column_c>"]
 }
 }
 }
 ]
 }
 }
 """
 create_pipeline(pipeline_spec)
- 
executar a primeira célula do Notebook com seus tokens de acesso pessoal. 
- 
executar a segunda célula do Notebook. 
- 
Execute a terceira célula do Notebook com seus detalhes pipeline. Esta execução create_pipeline.- list_pipelineretorna a ID do pipeline e seus detalhes.
- edit_pipelinepermite que o senhor edite a definição do pipeline.
- delete_pipelineexclui o pipeline.
 
Opção 4: Databricks CLI
O senhor pode usar as seguintes propriedades de configuração de tabela na definição do pipeline para selecionar ou desmarcar colunas específicas para ingestão:
- include_columns: opcionalmente, especifique uma lista de colunas a serem incluídas para ingestão. Se o senhor usar essa opção para incluir colunas explicitamente, o pipeline excluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
- exclude_columns: opcionalmente, especifique uma lista de colunas a serem excluídas da ingestão. Se o senhor usar essa opção para excluir explicitamente as colunas, o pipeline incluirá automaticamente as colunas que forem adicionadas à origem no futuro. Para ingerir as colunas futuras, você precisará adicioná-las à lista.
Você também pode especificar solicitações na URL do relatório (source_url), o que permite que você ingira relatórios filtrados.
- Confirme se existe uma conexão do Unity Catalog com o Workday. Para ver as etapas de criação de uma conexão, consulte Conectar-se a fontes de ingestão gerenciáveis.
- Execute o seguinte comando para criar o pipeline:
databricks pipelines create --json "<pipeline-definition OR json-file-path>"
definição de pipeline padrão
A seguir, é apresentada uma definição do padrão JSON pipeline :
"ingestion_definition": {
     "connection_name": "<connection-name>",
     "objects": [
       {
         "report": {
           "source_url": "<report-url>",
           "destination_catalog": "<destination-catalog>",
           "destination_schema": "<destination-schema>",
           "table_configuration": {
              "primary_keys": ["<primary-key>"],
              "scd_type": "SCD_TYPE_2",
              "include_columns": ["<column-a>", "<column-b>", "<column-c>"]
           }
         }
       }
     ]
 }
começar, programar e definir alertas em seu pipeline
O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.
- 
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline . O novo pipeline aparece na lista pipeline. 
- 
Para acessar view os detalhes de pipeline, clique no nome pipeline. 
- 
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar . 
- 
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação. 
Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.
Exemplo: ingerir dois relatórios do Workday em esquemas separados
O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
  pipelines:
    pipeline_workday:
      name: workday_pipeline
      catalog: my_catalog_1 # Location of the pipeline event log
      schema: my_schema_1 # Location of the pipeline event log
      ingestion_definition:
        connection_name: <workday-connection>
        objects:
          - report:
              source_url: <report-url-1>
              destination_catalog: my_catalog_1
              destination_schema: my_schema_1
              destination_table: my_table_1
              table_configuration:
                primary_keys:
                  - <primary_key_column>
          - report:
              source_url: <report-url-2>
              destination_catalog: my_catalog_2
              destination_schema: my_schema_2
              destination_table: my_table_2
              table_configuration:
                primary_keys:
                  - <primary_key_column>