Ingerir relatórios do Workday

Prévia

O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .

Este artigo descreve como ingerir relatórios do Workday e carregá-los em Databricks usando o LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e Delta Live Tables.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • O compute sem servidor está habilitado para Notebook, fluxo de trabalho e Delta Live Tables. Consulte Ativar serverless compute .

  • Para criar uma conexão: Você tem CREATE CONNECTION na metastore.

    Para usar uma conexão existente: Você tem USE CONNECTION ou ALL PRIVILEGES no objeto de conexão.

  • USE CATALOG no catálogo de destino.

  • USE SCHEMA e CREATE TABLE em um esquema existente ou CREATE SCHEMA no catálogo de destino.

Configurar relatórios do Workday para ingestão

Consulte Configurar relatórios do Workday para ingestão.

Criar uma conexão com o Workday

Permissões necessárias: CREATE CONNECTION no metastore.

Para criar uma conexão com o Workday, faça o seguinte:

  1. No site Databricks workspace, clique em Catalog > External locations > Connections > Create connection.

  2. Para Connection name (Nome da conexão), digite um nome exclusivo para a conexão do Workday.

  3. Para Tipo de conexão, selecione Relatórios do Workday.

  4. Para o tipo de autenticação, selecione OAuth refresh tokens e, em seguida, insira o ID do cliente, o segredo do cliente e os tokens de atualização que o senhor gerou durante a configuração da origem.

  5. Na página Criar conexão, clique em Criar.

Observação

Testar a conexão para verificar se o host está acessível. Ele não testa as credenciais do usuário quanto aos valores corretos de nome de usuário e senha.

Criar um pipeline do Delta Live Tables

Este passo descreve como configurar a ingestão pipeline. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome (mas tudo em letras minúsculas) no destino, a menos que o senhor a tenha renomeado explicitamente.

  1. Gerar um site pessoal access token.

  2. Cole o código a seguir em uma célula do Python Notebook, modificando o valor <personal-access-token>:

    # SHOULD MODIFY
    # This step sets up a PAT to make API calls to the Databricks service.
    api_token = "<personal-access-token>"
    
  3. Cole o código a seguir em uma segunda célula do Notebook:

    # DO NOT MODIFY
    # This step sets up a connection to make API calls to the Databricks service.
    import requests
    import json
    
    
    notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    workspace_url = notebook_context.apiUrl().get()
    api_url = f"{workspace_url}/api/2.0/pipelines"
    
    
    headers = {
       'Authorization': 'Bearer {}'.format(api_token),
       'Content-Type': 'application/json'
    }
    
    
    def check_response(response):
       if response.status_code == 200:
          print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
       else:
          print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
    
    
    # DO NOT MODIFY
    # These are API definition to be used.
    def create_pipeline(pipeline_definition: str):
    response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def edit_pipeline(id: str, pipeline_definition: str):
    response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
    check_response(response)
    
    
    def delete_pipeline(id: str):
    response = requests.delete(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def get_pipeline(id: str):
    response = requests.get(url=f"{api_url}/{id}", headers=headers)
    check_response(response)
    
    
    def list_pipeline(filter: str = ""):
    body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
    response = requests.get(url=api_url, headers=headers, data=body)
    check_response(response)
    
  4. Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:

    # SHOULD MODIFY
    # Update this notebook to configure your ingestion pipeline.
    
    pipeline_spec = """
    {
    "name": "<YOUR_PIPELINE_NAME>",
    "ingestion_definition": {
       "connection_name": "<YOUR_CONNECTON_NAME>",
       "objects": [
          {
             "report": {
             "source_url": "<YOUR_REPORT_URL>,
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_TABLE>",
             "table_configuration": {
                   "primary_keys": [<PRIMARY_KEY>]
                }
             }
          }, {
             "report": {
             "source_url": "<YOUR_SECOND_REPORT_URL>,
             "destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
             "destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
             "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
             "table_configuration": {
                   "primary_keys": [<PRIMARY_KEY>],
                   "scd_type": "SCD_TYPE_2"
                }
             }
          }
       ]
    },
    "channel": "CURRENT"
    }
    """
    
    
    create_pipeline(pipeline_spec)
    
  5. executar o primeiro celular Notebook com seu access token pessoal.

  6. executar a segunda célula do Notebook.

  7. Execute a terceira célula do Notebook com seus detalhes pipeline. Essa execução create_pipeline.

    • list_pipeline retorna a ID do pipeline e seus detalhes.

    • edit_pipeline permite que o senhor edite a definição do pipeline.

    • delete_pipeline exclui o pipeline.

Para criar o pipeline:

databricks pipelines create --json "<pipeline_definition OR json file path>"

Para editar o pipeline:

databricks pipelines update --json "<<pipeline_definition OR json file path>"

Para obter a definição do pipeline:

databricks pipelines get "<your_pipeline_id>"

Para excluir o pipeline:

databricks pipelines delete "<your_pipeline_id>"

Para obter mais informações, o senhor pode sempre executar:

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

começar, programar e definir alertas em seu pipeline

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em Delta Live Tables.

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, execute o pipeline clicando em começar. O senhor pode programar o pipeline clicando em programar.

  4. Para definir o alerta no site pipeline, clique em programar, clique em Mais opções e, em seguida, adicione uma notificação.

  5. Após a conclusão da ingestão, o senhor pode consultar suas tabelas.