Pular para o conteúdo principal

Criar um pipeline de ingestão do Microsoft Dynamics 365

info

Visualização

Este recurso está em Pré-visualização Pública.

Esta página descreve como criar um pipeline de ingestão gerenciado que sincroniza dados do Microsoft Dynamics 365 com tabelas Delta Lake usando LakeFlow Connect.

Interfaces suportadas

Você pode criar um pipeline de ingestão do D365 usando:

  • Notebooks do Databricks
  • CLI do Databricks
  • Databricks Asset Bundles
nota

No momento, o conector do Dynamics 365 não oferece suporte à criação pipeline baseada em interface do usuário. Utilize uma das interfaces programáticas listadas acima.

Requisitos

Antes de criar um pipeline, conclua os seguintes pré-requisitos:

  • Configure sua fonte de dados do D365 com Azure Synapse Link e a autenticação Microsoft Entra ID.
  • Crie uma conexão Unity Catalog para o D365 no Explorador de Catálogos.
  • Identifique o URL ou ID do ambiente Dataverse (seu source_schema).
  • Identifique os nomes lógicos das tabelas do D365 que você deseja ingerir (seus valores source_table ).
  • Decida qual o seu tipo de SCD (Tipo 1 ou Tipo 2).
  • Criar ou identificar o catálogo e o esquema de destino do Unity Catalog.
dica

Para encontrar os nomes lógicos das tabelas, use a API do Dataverse ou o portal do criador do Power Apps. No Power Apps, acesse Tabelas e view a coluna Nome lógico .

Opção 1: Notebook do Databricks

Utilize o seguinte código Notebook para criar um pipeline de ingestão do D365:

Python
# Import the ingestion API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import IngestionPipelineDefinition

# Initialize the workspace client
w = WorkspaceClient()

# Define the pipeline configuration
pipeline_config = IngestionPipelineDefinition(
# Required: Use PREVIEW channel
channel="PREVIEW",

# Your Unity Catalog connection name
connection_name="d365_connection",

# Dataverse environment URL or ID
source_schema="https://yourorg.crm.dynamics.com",

# List of D365 tables to ingest (logical names)
source_table=[
"account", # Accounts
"contact", # Contacts
"opportunity", # Opportunities
"salesorder" # Sales Orders
],

# Target location in Unity Catalog
destination_catalog="main",
destination_schema="d365_data",

# History tracking: SCD_TYPE_1 (history tracking off) or SCD_TYPE_2 (history tracking on)
scd_type="SCD_TYPE_2"
)

# Create the pipeline
pipeline = w.pipelines.create(
name="d365_sales_ingestion",
ingestion_definition=pipeline_config
)

print(f"Pipeline created with ID: {pipeline.pipeline_id}")

Ingestão de múltiplos esquemas

Para importar tabelas de vários ambientes Dataverse, crie um pipeline separado:

Python
# Pipeline for production environment
prod_pipeline = w.pipelines.create(
name="d365_prod_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://prod.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_prod",
scd_type="SCD_TYPE_2"
)
)

# Pipeline for test environment
test_pipeline = w.pipelines.create(
name="d365_test_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://test.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_test",
scd_type="SCD_TYPE_2"
)
)

Selecionando colunas específicas

Para importar apenas colunas específicas das tabelas de origem, use a seleção de colunas:

Python
pipeline_config = IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://yourorg.crm.dynamics.com",
source_table=["account"],
destination_catalog="main",
destination_schema="d365_data",
scd_type="SCD_TYPE_1",

# Specify columns to include
table_configuration={
"account": {
"columns": [
"accountid",
"name",
"accountnumber",
"emailaddress1",
"telephone1",
"address1_city",
"address1_stateorprovince"
]
}
}
)

Opção 2: CLI do Databricks

Utilize a CLI Databricks para criar um pipeline a partir de um arquivo de configuração.

Crie um arquivo chamado d365-pipeline.json:

JSON
{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": ["account", "contact", "opportunity", "salesorder"],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}

Crie o pipeline usando a CLI:

Bash
databricks pipelines create --json @d365-pipeline.json

Opção 3: Pacotes Databricks ativos

Use Databricks ativos Bundles para gerenciar o pipeline D365 como código.

Crie um arquivo chamado databricks.yml:

YAML
resources:
pipelines:
d365_sales_ingestion:
name: 'd365_sales_ingestion'
ingestion_definition:
channel: 'PREVIEW'
connection_name: 'd365_connection'
source_schema: 'https://yourorg.crm.dynamics.com'
source_table:
- 'account'
- 'contact'
- 'opportunity'
- 'salesorder'
destination_catalog: 'main'
destination_schema: 'd365_data'
scd_type: 'SCD_TYPE_2'

implantar o pacote:

Bash
databricks bundle deploy

Opção 4: APIs do Databricks

Utilize a APIde pipeline para criar um pipeline de ingestão do D365.

Exemplo de solicitação à API:

Bash
curl -X POST \
https://<workspace-url>/api/2.0/pipelines \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": [
"account",
"contact",
"opportunity",
"salesorder"
],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}'

Verificar a criação do pipeline

Após a criação do pipeline:

  1. Acesse "Trabalhos e pipeline" no seu workspace.
  2. Localize seu pipeline pelo nome.
  3. Selecione o pipeline para view os detalhes.
  4. Selecione começar para executar a ingestão inicial.
  5. Monitore a execução pipeline e verifique se pipeline cria tabelas no esquema de destino.

Para verificar os dados ingeridos:

SQL
-- Check the account table
SELECT * FROM main.d365_data.account LIMIT 10;

-- Verify record counts
SELECT COUNT(*) FROM main.d365_data.account;
nota

A execução inicial pipeline realiza uma refresh completa de todas as tabelas selecionadas. A execução subsequente utiliza a ingestão incremental baseada no cursor VersionNumber dos registros de alterações do Azure Synapse Link.

Próximos passos

Após criar seu pipeline: