Pular para o conteúdo principal

Configurar fonte de dados para ingestão Microsoft Dynamics 365

info

Visualização

Este recurso está em Pré-visualização Pública.

Aprenda como configurar Microsoft Dynamics 365 como fonte de dados para ingestão no Databricks usando LakeFlow Connect.

Para obter informações sobre como o conector acessa seus dados de origem, consulte Como o conector acessa os dados do D365?. Para obter uma lista dos aplicativos Dataverse compatíveis, consulte Quais aplicativos do Dynamics 365 são compatíveis?.

Pré-requisitos

Antes de configurar a fonte de dados do D365, você precisa ter:

  • Uma inscrição ativa Azure com permissões para criar recurso.
  • Um ambiente Microsoft Dynamics 365 com acesso de administrador.
  • Um ambiente Dataverse associado à sua instância do D365.
  • Permissões de administrador do espaço de trabalho ou administrador do metastore no Databricks.
  • Permissões para criar e configurar o Azure Synapse Link no seu ambiente Dataverse.
  • Uma account de armazenamento ADLS Gen2 (ou permissões para criar uma).
  • Permissões para criar e configurar aplicativos Microsoft Entra ID.
  • API Dataverse v9.2 ou posterior.
  • API REST do Armazenamento do Azure versão 2021-08-06.
  • Azure Synapse Link para Dataverse versão 1.0 ou posterior.

Configure entidades virtuais ou tabelas diretas (opcional)

Entidades virtuais e tabelas diretas tornam os dados de fontes que não são do Dataverse (como o D365 Finance & Operacional) disponíveis no Dataverse sem a necessidade de copiar os dados. Para fontes que não sejam do Dataverse, você deve configurar entidades virtuais ou tabelas diretas antes de configurar o Azure Synapse Link.

Para configurar entidades virtuais:

  1. No Power Apps , acesse a página Ambientes e clique em Aplicativos do Dynamics 365 .

  2. Para vincular entidades de F&O como entidades virtuais no Dataverse, instale as soluções de Entidade Virtual de Finanças e Operações .

  3. Configure a autorização de serviço para serviço (S2S) entre o Dataverse e seu aplicativo de F&O. Isso permite que o Dataverse se comunique com seu aplicativo. Para obter detalhes, consulte a documentação da Microsoft:

  4. Configure a autorização de serviço para serviço (S2S) entre o Dataverse e seu aplicativo de F&O. Isso permite que o Dataverse se comunique com seu aplicativo. Para obter detalhes, consulte a documentação da Microsoft sobre como configurar entidades virtuais do Dataverse.

  5. Para cada entidade virtual que você deseja importar, habilite o recurso "Controlar alterações" em "Propriedades avançadas" .

  6. Por default, a solução F&O Virtual Entity default algumas entidades virtuais na lista de tabelas do Dataverse. No entanto, você pode expor entidades adicionais manualmente:

    1. Acesse a página de Configurações Avançadas do seu ambiente Dataverse.
    2. Clique no ícone de filtro no canto superior direito para acessar a pesquisa avançada.
    3. Selecione "Entidades de Financiamento e Operações Disponíveis" no menu suspenso e clique em "Resultados" .
    4. Selecione a entidade virtual que deseja expor.
    5. Na página de administração da entidade , alterne a opção Visível para Verdadeiro e clique em Salvar e fechar .

Agora você pode ver a entidade na lista de tabelas do Dataverse com um nome que começa com mserp_.

importante

Entidades virtuais e tabelas diretas devem ser configuradas e sincronizadas corretamente antes de aparecerem no Azure Synapse Link. Dê algum tempo para que eles fiquem disponíveis.

Nesta etapa, você usará o Synapse Link para Dataverse no Azure Data Lake para escolher as tabelas que deseja ingerir.

nota

Synapse Link para Dataverse para o Azure Data Lake substitui o serviço anteriormente conhecido como Exportar dados para o Azure Data Lake Storage Gen2. Apesar do nome, este recurso não usa nem depende do Azure Synapse Analytics; trata-se de um serviço de exportação contínua do Dataverse para o ADLS Gen 2.

nota

Por default, Microsoft atualmente suporta até 1.000 tabelas por perfil Synapse Link. Se sua aplicação tiver mais tabelas selecionadas, você precisará criar vários perfis.

  1. No portal do Power Apps , clique em Analisar e, em seguida, em Vincular ao Azure Synapse .

  2. Clique em Novo Link . O Dataverse preencherá automaticamente sua inscrição ativa do mesmo tenant. Selecione a inscrição apropriada no dropdown.

  3. Não selecione a caixa de seleção Conectar ao seu espaço de trabalho Azure Synapse Analytics . (Os dados devem ser enviados em formato CSV, pois o conector não oferece suporte ao formato Parquet no momento.)

nota

Para usar este conector, você não pode adicionar tabelas do Dataverse a uma account de armazenamento existente que já esteja vinculada a um perfil diferente Synapse Link. Você precisa ter acesso a uma inscrição Azure não vinculada para poder criar um novo perfil Synapse Link.

  1. Na página de criação de links do Synapse , clique em Avançado . Em seguida, ative a opção Mostrar configurações avançadas .

  2. Ative a opção "Atualizar estrutura de pastas incrementalmente" e defina o intervalo de atualização desejado para o Synapse Link. O mínimo é de 5 minutos. Este intervalo aplica-se a todas as tabelas incluídas neste link do Synapse. (Você definirá um programador para seu pipeline Databricks em uma etapa separada.)

  3. Selecione as tabelas que deseja sincronizar, mantendo as configurações "Acrescentar apenas" e "Particionar" como default.

    • Se estiver importando dados de um aplicativo nativo do Dataverse, selecione as tabelas relevantes do Dataverse diretamente na seção Dataverse .
    • Ao importar dados do F&O, você pode selecionar tabelas diretas da seção D365 Finance & operações ou entidades virtuais da seção Dataverse (prefixo mserp_). Para mais informações sobre entidades virtuais, consulte o passo 1.
  4. Clique em Salvar .

  5. Após salvar as alterações, a sincronização inicial do Synapse Link deverá começar.

nota

Para usuários de F&O (Futures & Options), essa sincronização inicial pode levar horas para tabelas grandes com centenas de gigabytes. Mas se a sincronização inicial de uma entidade demorar muito, você pode acelerá-la criando um índice na tabela por meio do aplicativo F&O.

  • Navegue até a tabela que deseja indexar no ambiente F&O.
  • Crie uma extensão para a tabela.
  • Dentro da extensão da tabela, defina um novo índice.
  • Adicione os campos que deseja incluir no índice. (Isso ajuda a acelerar a busca no banco de dados com base nesses campos.)
  • Salve e implemente as alterações no seu ambiente de F&O.

Crie um aplicativo Entra ID para ingestão.

Nesta etapa, você coletará as informações do ID de entrada necessárias para criar uma conexão com Unity Catalog que suporte a ingestão no Databricks.

  1. Colete o IDtenant do seu tenant do Entra ID (portal.azure.com >> Microsoft Entra ID >> tabVisão geral >> ID do locatário , listado no painel direito).

  2. Ao criar um link do Azure Synapse, o Azure Synapse cria um contêiner do ADLS para sincronizar as tabelas selecionadas. Localize o nome do contêiner ADLS visitando a página de administração do Synapse Link.

  3. Colete as credenciais de acesso para o contêiner ADLS.

    1. Crie um aplicativo Microsoft Entra ID, caso ainda não tenha um.
    2. Recolha o segredo do cliente .
    3. Colete o ID do aplicativo (portal.azure.com >> IDMicrosoft Entra >> gerenciar >> Registros de aplicativos ).
  4. Conceda ao aplicativo Entra ID acesso ao contêiner ADLS, caso ainda não o tenha feito.

nota

Certifique-se de que seu aplicativo Entra ID tenha acesso aos contêineres ADLS associados a cada perfil do Synapse Link. Se você estiver importando dados de vários ambientes ou aplicativos, confirme se o aplicativo possui atribuições de função em todos os contêineres relevantes.

  1. Acesse sua conta de armazenamentoAzure e selecione seu contêiner ou account de armazenamento. (A Databricks recomenda o nível do contêiner para manter o princípio do menor privilégio.)
  2. Clique em Controle de Acesso (IAM) e, em seguida, em Adicionar atribuição de função .
  3. Selecione a função de acesso Colaborador do Blob de Armazenamento → Leitura/Gravação/Exclusão. (Caso sua organização não permita isso, entre em contato com a equipe da sua account Databricks .)
  4. Clique em Avançar e, em seguida, selecione Membros .
  5. Selecione Usuário, grupo ou entidade de serviço e, em seguida , Pesquise o registro do seu aplicativo . (Caso o aplicativo não esteja presente nos resultados da pesquisa, você pode inserir explicitamente o ID do objeto na barra de pesquisa e pressionar Enter ).
  6. Clique em Revisar + Atribuir .
  7. Para confirmar se as permissões estão configuradas corretamente, você pode verificar o Controle de Acesso do seu contêiner.

Criar um pipeline do Dynamics 365

Use a interface do usuário

  1. No menu à esquerda, clique em Novo e, em seguida, em Adicionar ou upload dados .
  2. Na página Adicionar dados , clique no bloco do Dynamics 365 .
  3. A partir daí, siga as instruções do assistente.

Utilize a API

Criar uma conexão com o Dynamics 365

Nesta etapa, você criará uma conexão com Unity Catalog para armazenar com segurança suas credenciais do D365 e iniciar a ingestão no Databricks.

  1. Em seu workspace, clique em Catálogo > Dados Externos > Conexões > Criar Conexão .
  2. Forneça um nome de conexão exclusivo e, em seguida, selecione Dynamics 365 como o tipo de conexão .
  3. Insira o segredo do cliente e o ID do cliente do aplicativo Entra ID criado na etapa anterior. Não modifique o escopo. Clique em Avançar .
  4. Insira o nome da conta do ArmazenamentoAzure , o ID do locatário e o nome do contêinerADLS e clique em Criar conexão.
  5. Anote o nome da conexão.

Criar um pipeline do Dynamics 365

Nesta etapa, você configurará o pipeline de ingestão. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome no destino.

Existem duas opções para criar o pipeline de ingestão:

  • Use um caderno
  • Use a CLI do Databricks

Ambas as abordagens fazem chamadas de API para um serviço do Databricks que cria o pipeline.

Notebook:

  1. Copie o Notebook padrão.
  2. executar a primeira célula do Notebook sem modificá-la.
  3. Modifique a segunda célula do Notebook com os detalhes do seu pipeline (por exemplo, a tabela da qual você deseja ingerir os dados, onde deseja armazená-los, etc.).
  4. execução da segunda célula do Caderno padrão; esta execução create_pipeline.
  5. Você pode executar list_pipeline para mostrar o ID pipeline e seus detalhes.
  6. Você pode executar edit_pipeline para editar a definição pipeline .
  7. Você pode executar delete_pipeline para excluir o pipeline.

CLI:

Para criar o pipeline:

Bash
databricks pipelines create --json "<pipeline_definition OR json file path>"

Para editar o pipeline:

Bash
databricks pipelines update --json "<<pipeline_definition OR json file path>"

Para obter a definição do pipeline:

Bash
databricks pipelines get "<your_pipeline_id>"

Para excluir o pipeline:

Bash
databricks pipelines delete "<your_pipeline_id>"

Para mais informação pode sempre executar:

Bash
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

Configurar recurso adicional (opcional)

O conector oferece recursos adicionais, como SCD tipo 2 para história acompanhamento, seleção e deseleção em nível de coluna. Consulte Padrões comuns para gerenciar o pipeline de ingestão.

Notebook

Célula 1

Não modifique esta célula.

Python
# DO NOT MODIFY

# This sets up the API utils for creating managed ingestion pipelines in Databricks.

import requests
import json

notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"

headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}

def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")

def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)

def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)

def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)

def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)

def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)

def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)

def stop_pipeline(id: str):
print("cannot stop pipeline")

Célula 2

Não modifique o canal PREVIEW no código abaixo.

Se você deseja ingerir todas as tabelas sincronizadas pelo seu Azure Synapse Link, use a especificação em nível de esquema. (Observe, no entanto, que a Databricks não recomenda adicionar mais de 250 tabelas por pipeline.) Se você deseja ingerir apenas tabelas específicas, use a especificação em nível de tabela.

Certifique-se de que o nome da tabela de origem corresponda ao nome da tabela que aparece na coluna Nome da página de gerenciamento Synapse Link.

Python
# Option A: schema-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
Python
# Option B: table-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

Exemplo: SCD tipo 2

Por default, a API usa o tipo SCD 1. Isso significa que ele sobrescreve os dados no destino se eles forem editados na origem. Se preferir preservar os dados históricos e usar o tipo 2 SCD , especifique isso na configuração. Por exemplo:

Python
# Schema-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)
Python
# Table-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)

Exemplo: Seleção e deseleção em nível de coluna

Por default, a API importa todas as colunas da tabela selecionada. No entanto, você pode optar por incluir ou excluir colunas específicas. Por exemplo:

Python
# Table spec with included and excluded columns.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"include_columns": ["<COLUMN_A>", "<COLUMN_B>", "<COLUMN_C>"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""

create_pipeline(pipeline_spec)