Configurar fonte de dados para ingestão Microsoft Dynamics 365
Visualização
Este recurso está em Pré-visualização Pública.
Aprenda como configurar Microsoft Dynamics 365 como fonte de dados para ingestão no Databricks usando LakeFlow Connect.
Para obter informações sobre como o conector acessa seus dados de origem, consulte Como o conector acessa os dados do D365?. Para obter uma lista dos aplicativos Dataverse compatíveis, consulte Quais aplicativos do Dynamics 365 são compatíveis?.
Pré-requisitos
Antes de configurar a fonte de dados do D365, você precisa ter:
- Uma inscrição ativa Azure com permissões para criar recurso.
- Um ambiente Microsoft Dynamics 365 com acesso de administrador.
- Um ambiente Dataverse associado à sua instância do D365.
- Permissões de administrador do espaço de trabalho ou administrador do metastore no Databricks.
- Permissões para criar e configurar o Azure Synapse Link no seu ambiente Dataverse.
- Uma account de armazenamento ADLS Gen2 (ou permissões para criar uma).
- Permissões para criar e configurar aplicativos Microsoft Entra ID.
- API Dataverse v9.2 ou posterior.
- API REST do Armazenamento do Azure versão 2021-08-06.
- Azure Synapse Link para Dataverse versão 1.0 ou posterior.
Configure entidades virtuais ou tabelas diretas (opcional)
Entidades virtuais e tabelas diretas tornam os dados de fontes que não são do Dataverse (como o D365 Finance & Operacional) disponíveis no Dataverse sem a necessidade de copiar os dados. Para fontes que não sejam do Dataverse, você deve configurar entidades virtuais ou tabelas diretas antes de configurar o Azure Synapse Link.
Para configurar entidades virtuais:
-
No Power Apps , acesse a página Ambientes e clique em Aplicativos do Dynamics 365 .
-
Para vincular entidades de F&O como entidades virtuais no Dataverse, instale as soluções de Entidade Virtual de Finanças e Operações .
-
Configure a autorização de serviço para serviço (S2S) entre o Dataverse e seu aplicativo de F&O. Isso permite que o Dataverse se comunique com seu aplicativo. Para obter detalhes, consulte a documentação da Microsoft:
-
Configure a autorização de serviço para serviço (S2S) entre o Dataverse e seu aplicativo de F&O. Isso permite que o Dataverse se comunique com seu aplicativo. Para obter detalhes, consulte a documentação da Microsoft sobre como configurar entidades virtuais do Dataverse.
-
Para cada entidade virtual que você deseja importar, habilite o recurso "Controlar alterações" em "Propriedades avançadas" .
-
Por default, a solução F&O Virtual Entity default algumas entidades virtuais na lista de tabelas do Dataverse. No entanto, você pode expor entidades adicionais manualmente:
- Acesse a página de Configurações Avançadas do seu ambiente Dataverse.
- Clique no ícone de filtro no canto superior direito para acessar a pesquisa avançada.
- Selecione "Entidades de Financiamento e Operações Disponíveis" no menu suspenso e clique em "Resultados" .
- Selecione a entidade virtual que deseja expor.
- Na página de administração da entidade , alterne a opção Visível para Verdadeiro e clique em Salvar e fechar .
Agora você pode ver a entidade na lista de tabelas do Dataverse com um nome que começa com mserp_.
Entidades virtuais e tabelas diretas devem ser configuradas e sincronizadas corretamente antes de aparecerem no Azure Synapse Link. Dê algum tempo para que eles fiquem disponíveis.
Configurar o Azure Synapse Link
Nesta etapa, você usará o Synapse Link para Dataverse no Azure Data Lake para escolher as tabelas que deseja ingerir.
Synapse Link para Dataverse para o Azure Data Lake substitui o serviço anteriormente conhecido como Exportar dados para o Azure Data Lake Storage Gen2. Apesar do nome, este recurso não usa nem depende do Azure Synapse Analytics; trata-se de um serviço de exportação contínua do Dataverse para o ADLS Gen 2.
Por default, Microsoft atualmente suporta até 1.000 tabelas por perfil Synapse Link. Se sua aplicação tiver mais tabelas selecionadas, você precisará criar vários perfis.
-
No portal do Power Apps , clique em Analisar e, em seguida, em Vincular ao Azure Synapse .
-
Clique em Novo Link . O Dataverse preencherá automaticamente sua inscrição ativa do mesmo tenant. Selecione a inscrição apropriada no dropdown.
-
Não selecione a caixa de seleção Conectar ao seu espaço de trabalho Azure Synapse Analytics . (Os dados devem ser enviados em formato CSV, pois o conector não oferece suporte ao formato Parquet no momento.)
Para usar este conector, você não pode adicionar tabelas do Dataverse a uma account de armazenamento existente que já esteja vinculada a um perfil diferente Synapse Link. Você precisa ter acesso a uma inscrição Azure não vinculada para poder criar um novo perfil Synapse Link.
-
Na página de criação de links do Synapse , clique em Avançado . Em seguida, ative a opção Mostrar configurações avançadas .
-
Ative a opção "Atualizar estrutura de pastas incrementalmente" e defina o intervalo de atualização desejado para o Synapse Link. O mínimo é de 5 minutos. Este intervalo aplica-se a todas as tabelas incluídas neste link do Synapse. (Você definirá um programador para seu pipeline Databricks em uma etapa separada.)
-
Selecione as tabelas que deseja sincronizar, mantendo as configurações "Acrescentar apenas" e "Particionar" como default.
- Se estiver importando dados de um aplicativo nativo do Dataverse, selecione as tabelas relevantes do Dataverse diretamente na seção Dataverse .
- Ao importar dados do F&O, você pode selecionar tabelas diretas da seção D365 Finance & operações ou entidades virtuais da seção Dataverse (prefixo
mserp_). Para mais informações sobre entidades virtuais, consulte o passo 1.
-
Clique em Salvar .
-
Após salvar as alterações, a sincronização inicial do Synapse Link deverá começar.
Para usuários de F&O (Futures & Options), essa sincronização inicial pode levar horas para tabelas grandes com centenas de gigabytes. Mas se a sincronização inicial de uma entidade demorar muito, você pode acelerá-la criando um índice na tabela por meio do aplicativo F&O.
- Navegue até a tabela que deseja indexar no ambiente F&O.
- Crie uma extensão para a tabela.
- Dentro da extensão da tabela, defina um novo índice.
- Adicione os campos que deseja incluir no índice. (Isso ajuda a acelerar a busca no banco de dados com base nesses campos.)
- Salve e implemente as alterações no seu ambiente de F&O.
Crie um aplicativo Entra ID para ingestão.
Nesta etapa, você coletará as informações do ID de entrada necessárias para criar uma conexão com Unity Catalog que suporte a ingestão no Databricks.
-
Colete o IDtenant do seu tenant do Entra ID (
portal.azure.com>> Microsoft Entra ID >> tabVisão geral >> ID do locatário , listado no painel direito). -
Ao criar um link do Azure Synapse, o Azure Synapse cria um contêiner do ADLS para sincronizar as tabelas selecionadas. Localize o nome do contêiner ADLS visitando a página de administração do Synapse Link.
-
Colete as credenciais de acesso para o contêiner ADLS.
- Crie um aplicativo Microsoft Entra ID, caso ainda não tenha um.
- Recolha o segredo do cliente .
- Colete o ID do aplicativo (
portal.azure.com>> IDMicrosoft Entra >> gerenciar >> Registros de aplicativos ).
-
Conceda ao aplicativo Entra ID acesso ao contêiner ADLS, caso ainda não o tenha feito.
Certifique-se de que seu aplicativo Entra ID tenha acesso aos contêineres ADLS associados a cada perfil do Synapse Link. Se você estiver importando dados de vários ambientes ou aplicativos, confirme se o aplicativo possui atribuições de função em todos os contêineres relevantes.
- Acesse sua conta de armazenamentoAzure e selecione seu contêiner ou account de armazenamento. (A Databricks recomenda o nível do contêiner para manter o princípio do menor privilégio.)
- Clique em Controle de Acesso (IAM) e, em seguida, em Adicionar atribuição de função .
- Selecione a função de acesso Colaborador do Blob de Armazenamento → Leitura/Gravação/Exclusão. (Caso sua organização não permita isso, entre em contato com a equipe da sua account Databricks .)
- Clique em Avançar e, em seguida, selecione Membros .
- Selecione Usuário, grupo ou entidade de serviço e, em seguida , Pesquise o registro do seu aplicativo . (Caso o aplicativo não esteja presente nos resultados da pesquisa, você pode inserir explicitamente o ID do objeto na barra de pesquisa e pressionar Enter ).
- Clique em Revisar + Atribuir .
- Para confirmar se as permissões estão configuradas corretamente, você pode verificar o Controle de Acesso do seu contêiner.
Criar um pipeline do Dynamics 365
Use a interface do usuário
- No menu à esquerda, clique em Novo e, em seguida, em Adicionar ou upload dados .
- Na página Adicionar dados , clique no bloco do Dynamics 365 .
- A partir daí, siga as instruções do assistente.
Utilize a API
Criar uma conexão com o Dynamics 365
Nesta etapa, você criará uma conexão com Unity Catalog para armazenar com segurança suas credenciais do D365 e iniciar a ingestão no Databricks.
- Em seu workspace, clique em Catálogo > Dados Externos > Conexões > Criar Conexão .
- Forneça um nome de conexão exclusivo e, em seguida, selecione Dynamics 365 como o tipo de conexão .
- Insira o segredo do cliente e o ID do cliente do aplicativo Entra ID criado na etapa anterior. Não modifique o escopo. Clique em Avançar .
- Insira o nome da conta do ArmazenamentoAzure , o ID do locatário e o nome do contêinerADLS e clique em Criar conexão.
- Anote o nome da conexão.
Criar um pipeline do Dynamics 365
Nesta etapa, você configurará o pipeline de ingestão. Cada tabela ingerida recebe uma tabela de transmissão correspondente com o mesmo nome no destino.
Existem duas opções para criar o pipeline de ingestão:
- Use um caderno
- Use a CLI do Databricks
Ambas as abordagens fazem chamadas de API para um serviço do Databricks que cria o pipeline.
Notebook:
- Copie o Notebook padrão.
- executar a primeira célula do Notebook sem modificá-la.
- Modifique a segunda célula do Notebook com os detalhes do seu pipeline (por exemplo, a tabela da qual você deseja ingerir os dados, onde deseja armazená-los, etc.).
- execução da segunda célula do Caderno padrão; esta execução
create_pipeline. - Você pode executar
list_pipelinepara mostrar o ID pipeline e seus detalhes. - Você pode executar
edit_pipelinepara editar a definição pipeline . - Você pode executar
delete_pipelinepara excluir o pipeline.
CLI:
Para criar o pipeline:
databricks pipelines create --json "<pipeline_definition OR json file path>"
Para editar o pipeline:
databricks pipelines update --json "<<pipeline_definition OR json file path>"
Para obter a definição do pipeline:
databricks pipelines get "<your_pipeline_id>"
Para excluir o pipeline:
databricks pipelines delete "<your_pipeline_id>"
Para mais informação pode sempre executar:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
Configurar recurso adicional (opcional)
O conector oferece recursos adicionais, como SCD tipo 2 para história acompanhamento, seleção e deseleção em nível de coluna. Consulte Padrões comuns para gerenciar o pipeline de ingestão.
Notebook
Célula 1
Não modifique esta célula.
# DO NOT MODIFY
# This sets up the API utils for creating managed ingestion pipelines in Databricks.
import requests
import json
notebook_context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
api_token = notebook_context.apiToken().get()
workspace_url = notebook_context.apiUrl().get()
api_url = f"{workspace_url}/api/2.0/pipelines"
headers = {
'Authorization': 'Bearer {}'.format(api_token),
'Content-Type': 'application/json'
}
def check_response(response):
if response.status_code == 200:
print("Response from API:\n{}".format(json.dumps(response.json(), indent=2, sort_keys=False)))
else:
print(f"Failed to retrieve data: error_code={response.status_code}, error_message={response.json().get('message', response.text)}")
def create_pipeline(pipeline_definition: str):
response = requests.post(url=api_url, headers=headers, data=pipeline_definition)
check_response(response)
def edit_pipeline(id: str, pipeline_definition: str):
response = requests.put(url=f"{api_url}/{id}", headers=headers, data=pipeline_definition)
check_response(response)
def delete_pipeline(id: str):
response = requests.delete(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def list_pipeline(filter: str):
body = "" if len(filter) == 0 else f"""{{"filter": "{filter}"}}"""
response = requests.get(url=api_url, headers=headers, data=body)
check_response(response)
def get_pipeline(id: str):
response = requests.get(url=f"{api_url}/{id}", headers=headers)
check_response(response)
def start_pipeline(id: str, full_refresh: bool=False):
body = f"""
{{
"full_refresh": {str(full_refresh).lower()},
"validate_only": false,
"cause": "API_CALL"
}}
"""
response = requests.post(url=f"{api_url}/{id}/updates", headers=headers, data=body)
check_response(response)
def stop_pipeline(id: str):
print("cannot stop pipeline")
Célula 2
Não modifique o canal PREVIEW no código abaixo.
Se você deseja ingerir todas as tabelas sincronizadas pelo seu Azure Synapse Link, use a especificação em nível de esquema. (Observe, no entanto, que a Databricks não recomenda adicionar mais de 250 tabelas por pipeline.) Se você deseja ingerir apenas tabelas específicas, use a especificação em nível de tabela.
Certifique-se de que o nome da tabela de origem corresponda ao nome da tabela que aparece na coluna Nome da página de gerenciamento Synapse Link.
# Option A: schema-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
# Option B: table-level spec
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>"
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Exemplo: SCD tipo 2
Por default, a API usa o tipo SCD 1. Isso significa que ele sobrescreve os dados no destino se eles forem editados na origem. Se preferir preservar os dados históricos e usar o tipo 2 SCD , especifique isso na configuração. Por exemplo:
# Schema-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"schema": {
"source_schema": "objects",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
# Table-level spec with SCD type 2
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTION_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"scd_type": "SCD_TYPE_2"
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)
Exemplo: Seleção e deseleção em nível de coluna
Por default, a API importa todas as colunas da tabela selecionada. No entanto, você pode optar por incluir ou excluir colunas específicas. Por exemplo:
# Table spec with included and excluded columns.
pipeline_spec = """
{
"name": "<YOUR_PIPELINE_NAME>",
"ingestion_definition": {
"connection_name": "<YOUR_CONNECTON_NAME>",
"objects": [
{
"table": {
"source_schema": "objects",
"source_table": "<YOUR_F_AND_O_TABLE_NAME>",
"destination_catalog": "<YOUR_DATABRICKS_CATALOG>",
"destination_schema": "<YOUR_DATABRICKS_SCHEMA>",
"table_configuration": {
"include_columns": ["<COLUMN_A>", "<COLUMN_B>", "<COLUMN_C>"]
}
}
}
]
},
"channel": "PREVIEW"
}
"""
create_pipeline(pipeline_spec)