Ingira relatórios do Workday
Este artigo descreve como ingerir relatórios do Workday e carregá-los em Databricks usando LakeFlow Connect. A ingestão resultante pipeline é governada por Unity Catalog e é alimentada por serverless compute e DLT.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTION
na metastore.Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. -
Você tem privilégios
USE CATALOG
no catálogo de destino. -
Você tem privilégios
USE SCHEMA
eCREATE TABLE
em um esquema existente ou privilégiosCREATE SCHEMA
no catálogo de destino.
Para consumir do Workday, consulte Configurar relatórios do Workday para ingestão.
Crie uma conexão Workday
Permissões necessárias: CREATE CONNECTION
na metastore.
Para criar uma conexão Workday, faça o seguinte:
- Em seu site Databricks workspace, clique em Catalog > External locations > Connections > Create connection .
- Em Nome da conexão , insira um nome exclusivo para a conexão Workday.
- Em Tipo de conexão , selecione Relatórios de dias úteis .
- Para o tipo de autenticação , selecione OAuth refresh tokens e, em seguida, insira o ID do cliente , o segredo do cliente e os tokens de atualização que o senhor gerou durante a configuração da origem.
- Na página Criar conexão , clique em Criar .
Criar um pipeline de ingestão
Esta etapa descreve como configurar o pipeline de ingestão. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome (mas tudo em letras minúsculas) no destino, a menos que o senhor a tenha renomeado explicitamente.
- Databricks Asset Bundles
- Notebook
- CLI
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:
include_columns
: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.exclude_columns
: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/workday_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/workday_job.yml
).
The following is an example
resources/workday_pipeline.yml
file:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for workday_dab
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <workday-connection>
objects:
# An array of objects to ingest from Workday. This example
# ingests a sample report about all active employees. The Employee_ID key is used as
# the primary key for the report.
- report:
source_url: https://wd2-impl-services1.workday.com/ccx/service/customreport2/All_Active_Employees_Data?format=json
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: All_Active_Employees_Data
table_configuration:
primary_keys:
- Employee_ID
include_columns: # This can be exclude_columns instead
- <column_a>
- <column_b>
- <column_c>The following is an example
resources/workday_job.yml
file:YAMLresources:
jobs:
workday_dab_job:
name: workday_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_workday.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:
include_columns
: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.exclude_columns
: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
-
Generate a personal access token.
-
Paste the following code into a Python notebook cell, modifying the
<personal-access-token>
value:Python# SHOULD MODIFY
# This step sets up a PAT to make API calls to the Databricks service.
api_token = "<personal-access-token>" -
Paste the following code into a second notebook cell:
Python# DO NOT MODIFY
# This step sets up a connection to make API calls to the Databricks service.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
# DO NOT MODIFY
# These are API definition to be used.
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str = ""):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter} AND pipeline_type IN ('MANAGED_INGESTION')"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response) -
Paste the following code into a third notebook cell, modifying to reflect your pipeline specifications:
Python# SHOULD MODIFY
# Update this notebook to configure your ingestion pipeline.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"report": {
"source_url": "<YOUR_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"]
}
}
}, {
"report": {
"source_url": "<YOUR_SECOND_REPORT_URL>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
"table_configuration": {
"primary_keys": ["<PRIMARY_KEY>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column_a>", "<column_b>", "<column_c>"]
}
}
}
]
}
}
"""
create_pipeline(pipeline_spec) -
Run the first notebook cell with your personal access token.
-
Run the second notebook cell.
-
Run the third notebook cell with your pipeline details. This runs
create_pipeline
.list_pipeline
returns the pipeline ID and its details.edit_pipeline
allows you to edit the pipeline definition.delete_pipeline
deletes the pipeline.
You can use the following table configuration properties in your pipeline definition to select or deselect specific columns to ingest:
include_columns
: Optionally specify a list of columns to include for ingestion. If you use this option to explicitly include columns, the pipeline automatically excludes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.exclude_columns
: Optionally specify a list of columns to exclude from ingestion. If you use this option to explicitly exclude columns, the pipeline automatically includes columns that are added to the source in the future. To ingest the future columns, you'll have to add them to the list.
To create the pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
To edit the pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
To get the pipeline definition:
databricks pipelines get "<your_pipeline_id>"
To delete the pipeline:
databricks pipelines delete "<your_pipeline_id>"
For more information, you can always run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Exemplo de definição de pipeline JSON:
"ingestion_definition": {
"connection_name": "<connection-name>",
"objects": [
{
"report": {
"source_url": "<report-url>",
"destination_catalog": "<destination-catalog>",
"destination_schema": "<destination-schema>",
"table_configuration": {
"primary_keys": ["<primary-key>"],
"scd_type": "SCD_TYPE_2",
"include_columns": ["<column-a>", "<column-b>", "<column-c>"]
}
}
}
]
}
começar, programar e definir alertas em seu pipeline
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.
Exemplo: ingerir dois relatórios do Workday em esquemas separados
O exemplo de definição de pipeline nesta seção ingere dois relatórios do Workday em esquemas separados. O suporte a pipeline de vários destinos é somente de API.
resources:
pipelines:
pipeline_workday:
name: workday_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <workday-connection>
objects:
- report:
source_url: <report-url-1>
destination_catalog: my_catalog_1
destination_schema: my_schema_1
destination_table: my_table_1
table_configuration:
primary_keys:
- <primary_key_column>
- report:
source_url: <report-url-2>
destination_catalog: my_catalog_2
destination_schema: my_schema_2
destination_table: my_table_2
table_configuration:
primary_keys:
- <primary_key_column>