Criar um pipeline de ingestão do Microsoft Dynamics 365
Visualização
Este recurso está em Pré-visualização Pública.
Esta página descreve como criar um pipeline de ingestão gerenciado que sincroniza dados do Microsoft Dynamics 365 com tabelas Delta Lake usando LakeFlow Connect.
Interfaces suportadas
Você pode criar um pipeline de ingestão do D365 usando:
- Notebooks do Databricks
- CLI do Databricks
- Databricks Asset Bundles
No momento, o conector do Dynamics 365 não oferece suporte à criação pipeline baseada em interface do usuário. Utilize uma das interfaces programáticas listadas acima.
Requisitos
Antes de criar um pipeline, conclua os seguintes pré-requisitos:
- Configure sua fonte de dados do D365 com Azure Synapse Link e a autenticação Microsoft Entra ID.
- Crie uma conexão Unity Catalog para o D365 no Explorador de Catálogos.
- Identifique o URL ou ID do ambiente Dataverse (seu
source_schema). - Identifique os nomes lógicos das tabelas do D365 que você deseja ingerir (seus valores
source_table). - Decida qual o seu tipo de SCD (Tipo 1 ou Tipo 2).
- Criar ou identificar o catálogo e o esquema de destino do Unity Catalog.
Para encontrar os nomes lógicos das tabelas, use a API do Dataverse ou o portal do criador do Power Apps. No Power Apps, acesse Tabelas e view a coluna Nome lógico .
Opção 1: Notebook do Databricks
Utilize o seguinte código Notebook para criar um pipeline de ingestão do D365:
# Import the ingestion API
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import IngestionPipelineDefinition
# Initialize the workspace client
w = WorkspaceClient()
# Define the pipeline configuration
pipeline_config = IngestionPipelineDefinition(
# Required: Use PREVIEW channel
channel="PREVIEW",
# Your Unity Catalog connection name
connection_name="d365_connection",
# Dataverse environment URL or ID
source_schema="https://yourorg.crm.dynamics.com",
# List of D365 tables to ingest (logical names)
source_table=[
"account", # Accounts
"contact", # Contacts
"opportunity", # Opportunities
"salesorder" # Sales Orders
],
# Target location in Unity Catalog
destination_catalog="main",
destination_schema="d365_data",
# History tracking: SCD_TYPE_1 (history tracking off) or SCD_TYPE_2 (history tracking on)
scd_type="SCD_TYPE_2"
)
# Create the pipeline
pipeline = w.pipelines.create(
name="d365_sales_ingestion",
ingestion_definition=pipeline_config
)
print(f"Pipeline created with ID: {pipeline.pipeline_id}")
Ingestão de múltiplos esquemas
Para importar tabelas de vários ambientes Dataverse, crie um pipeline separado:
# Pipeline for production environment
prod_pipeline = w.pipelines.create(
name="d365_prod_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://prod.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_prod",
scd_type="SCD_TYPE_2"
)
)
# Pipeline for test environment
test_pipeline = w.pipelines.create(
name="d365_test_ingestion",
ingestion_definition=IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://test.crm.dynamics.com",
source_table=["account", "contact"],
destination_catalog="main",
destination_schema="d365_test",
scd_type="SCD_TYPE_2"
)
)
Selecionando colunas específicas
Para importar apenas colunas específicas das tabelas de origem, use a seleção de colunas:
pipeline_config = IngestionPipelineDefinition(
channel="PREVIEW",
connection_name="d365_connection",
source_schema="https://yourorg.crm.dynamics.com",
source_table=["account"],
destination_catalog="main",
destination_schema="d365_data",
scd_type="SCD_TYPE_1",
# Specify columns to include
table_configuration={
"account": {
"columns": [
"accountid",
"name",
"accountnumber",
"emailaddress1",
"telephone1",
"address1_city",
"address1_stateorprovince"
]
}
}
)
Opção 2: CLI do Databricks
Utilize a CLI Databricks para criar um pipeline a partir de um arquivo de configuração.
Crie um arquivo chamado d365-pipeline.json:
{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": ["account", "contact", "opportunity", "salesorder"],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}
Crie o pipeline usando a CLI:
databricks pipelines create --json @d365-pipeline.json
Opção 3: Pacotes Databricks ativos
Use Databricks ativos Bundles para gerenciar o pipeline D365 como código.
Crie um arquivo chamado databricks.yml:
resources:
pipelines:
d365_sales_ingestion:
name: 'd365_sales_ingestion'
ingestion_definition:
channel: 'PREVIEW'
connection_name: 'd365_connection'
source_schema: 'https://yourorg.crm.dynamics.com'
source_table:
- 'account'
- 'contact'
- 'opportunity'
- 'salesorder'
destination_catalog: 'main'
destination_schema: 'd365_data'
scd_type: 'SCD_TYPE_2'
implantar o pacote:
databricks bundle deploy
Opção 4: APIs do Databricks
Utilize a APIde pipeline para criar um pipeline de ingestão do D365.
Exemplo de solicitação à API:
curl -X POST \
https://<workspace-url>/api/2.0/pipelines \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{
"name": "d365_sales_ingestion",
"ingestion_definition": {
"channel": "PREVIEW",
"connection_name": "d365_connection",
"source_schema": "https://yourorg.crm.dynamics.com",
"source_table": [
"account",
"contact",
"opportunity",
"salesorder"
],
"destination_catalog": "main",
"destination_schema": "d365_data",
"scd_type": "SCD_TYPE_2"
}
}'
Verificar a criação do pipeline
Após a criação do pipeline:
- Acesse "Trabalhos e pipeline" no seu workspace.
- Localize seu pipeline pelo nome.
- Selecione o pipeline para view os detalhes.
- Selecione começar para executar a ingestão inicial.
- Monitore a execução pipeline e verifique se pipeline cria tabelas no esquema de destino.
Para verificar os dados ingeridos:
-- Check the account table
SELECT * FROM main.d365_data.account LIMIT 10;
-- Verify record counts
SELECT COUNT(*) FROM main.d365_data.account;
A execução inicial pipeline realiza uma refresh completa de todas as tabelas selecionadas. A execução subsequente utiliza a ingestão incremental baseada no cursor VersionNumber dos registros de alterações do Azure Synapse Link.
Próximos passos
Após criar seu pipeline:
- programar atualizações pipeline para execução automaticamente.
- Monitore a integridade pipeline e configure alertas.
- Execute uma refreshcompleta se precisar recarregar todos os dados.
- Consulte as orientações de resolução de problemas caso ocorram problemas.
- Compreender o comportamento da evolução do esquema para lidar com alterações na origem.