Ingerir relatórios do Workday
Prévia
O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .
Este artigo descreve como ingerir relatórios do Workday e carregá-los em Databricks usando o LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e Delta Live Tables.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
Seu workspace está habilitado para Unity Catalog.
O compute sem servidor está habilitado para Notebook, fluxo de trabalho e Delta Live Tables. Consulte Ativar serverless compute .
Para criar uma conexão: Você tem
CREATE CONNECTION
na metastore.Para usar uma conexão existente: Você tem
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão.USE CATALOG
no catálogo de destino.USE SCHEMA
eCREATE TABLE
em um esquema existente ouCREATE SCHEMA
no catálogo de destino.
Criar uma conexão com o Workday
Permissões necessárias: CREATE CONNECTION
no metastore.
Para criar uma conexão com o Workday, faça o seguinte:
No site Databricks workspace, clique em Catalog > External locations > Connections > Create connection.
Para Connection name (Nome da conexão), digite um nome exclusivo para a conexão do Workday.
Para Tipo de conexão, selecione Relatórios do Workday.
Para o tipo de autenticação, selecione OAuth refresh tokens e, em seguida, insira o ID do cliente, o segredo do cliente e os tokens de atualização que o senhor gerou durante a configuração da origem.
Na página Criar conexão, clique em Criar.
Criar um pipeline de ingestão
Este passo descreve como configurar a ingestão pipeline. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome (mas tudo em letras minúsculas) no destino, a menos que o senhor a tenha renomeado explicitamente.
Este tab descreve como implantar uma ingestão pipeline usando Databricks pacotes ativos (DABs). Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
Crie um novo pacote usando a CLI do Databricks:
databricks bundle init
Adicione dois novos arquivos de recurso ao pacote:
Um arquivo de definição de pipeline (
resources/workday_pipeline.yml
).Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/workday_job.yml
).
Veja a seguir um exemplo de arquivo
resources/workday_pipeline.yml
:variables: dest_catalog: default: main dest_schema: default: ingest_destination_schema # The main pipeline for workday_dab resources: pipelines: pipeline_workday: name: workday_pipeline channel: PREVIEW catalog: ${var.dest_catalog} schema: ${var.dest_schema} ingestion_definition: connection_name: <workday-connection> objects: # An array of objects to ingest from Workday. This example # ingests a sample report about all active employees. The Employee_ID key is used as # the primary key for the report. - report: source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} destination_table: All_Active_Employees_Data table_configuration: primary_keys: - Employee_ID
Veja a seguir um exemplo de arquivo
resources/workday_job.yml
:resources: jobs: workday_dab_job: name: workday_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_workday.id}
implantado o pipeline usando o Databricks CLI:
databricks bundle deploy
Gerar um site pessoal access token.
Cole o código a seguir em uma célula do Python Notebook, modificando o valor
<personal-access-token>
:# SHOULD MODIFY # This step sets up a PAT to make API calls to the Databricks service. api_token = "<personal-access-token>"
Cole o código a seguir em uma segunda célula do Notebook:
# DO NOT MODIFY # This step sets up a connection to make API calls to the Databricks service. import requests import json notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext() workspace_url = notebook_context.apiUrl().get() api_url = f"{workspace_url}/api/2.0/pipelines" headers = { 'Authorization': 'Bearer {}'.format(api_token), 'Content-Type': 'application/json' } def check_response(response): if response.status_code == 200: print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False))) else: print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}") # DO NOT MODIFY # These are API definition to be used. def create_pipeline(pipeline_definition: str): response = requests.post(url=api_url, headers=headers, data=pipeline_definition) check_response(response) def edit_pipeline(id: str, pipeline_definition: str): response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition) check_response(response) def delete_pipeline(id: str): response = requests.delete(url=f"{api_url}/{id}", headers=headers) check_response(response) def get_pipeline(id: str): response = requests.get(url=f"{api_url}/{id}", headers=headers) check_response(response) def list_pipeline(filter: str = ""): body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}""" response = requests.get(url=api_url, headers=headers, data=body) check_response(response)
Cole o código a seguir em uma terceira célula do Notebook, modificando-o para refletir as especificações do site pipeline:
# SHOULD MODIFY # Update this notebook to configure your ingestion pipeline. pipeline_spec = """ { "name": "<YOUR_PIPELINE_NAME>", "ingestion_definition": { "connection_name": "<YOUR_CONNECTON_NAME>", "objects": [ { "report": { "source_url": "<YOUR_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"] } } }, { "report": { "source_url": "<YOUR_SECOND_REPORT_URL>", "destination_catalog": "<YOUR_DATABRICKS_CATALOG>", "destination_schema": "<YOUR_DATABRICKS_SCHEMA>", "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>", "table_configuration": { "primary_keys": ["<PRIMARY_KEY>"], "scd_type": "SCD_TYPE_2" } } } ] }, "channel": "PREVIEW" } """ create_pipeline(pipeline_spec)
executar o primeiro celular Notebook com seu access token pessoal.
executar a segunda célula do Notebook.
Execute a terceira célula do Notebook com seus detalhes pipeline. Essa execução
create_pipeline
.list_pipeline
retorna a ID do pipeline e seus detalhes.edit_pipeline
permite que o senhor edite a definição do pipeline.delete_pipeline
exclui o pipeline.
Para criar o pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
Para editar o pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
Para obter a definição do pipeline:
databricks pipelines get "<your_pipeline_id>"
Para excluir o pipeline:
databricks pipelines delete "<your_pipeline_id>"
Para obter mais informações, o senhor pode sempre executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
começar, programar e definir alertas em seu pipeline
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em Delta Live Tables.
O novo pipeline aparece na lista pipeline.
Para acessar view os detalhes de pipeline, clique no nome pipeline.
Na página de detalhes do pipeline, execute o pipeline clicando em começar. O senhor pode programar o pipeline clicando em programar.
Para definir o alerta no site pipeline, clique em programar, clique em Mais opções e, em seguida, adicione uma notificação.
Após a conclusão da ingestão, o senhor pode consultar suas tabelas.