Pular para o conteúdo principal

Ingerir dados do SQL Server

Aprenda como importar dados do SQL Server para o Databricks usando LakeFlow Connect.

O conector SQL Server oferece suporte aos bancos de dados SQL Azure , à Instância de Gerenciamento Azure SQL e aos bancos de dados SQL Amazon RDS. Isso inclui o SQL Server executado em máquinas virtuais (VMs) do Azure e no Amazon EC2. O conector também oferece suporte SQL Server on-premises usando as redes Azure ExpressRoute e AWS Direct Connect.

Requisitos

  • Para criar um gateway de ingestão e um pipeline de ingestão, você deve primeiro atender aos seguintes requisitos:

    • Seu workspace está habilitado para Unity Catalog.

    • O compute sem servidor está habilitado para o seu workspace. Consulte os requisitos do compute sem servidor.

    • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION no metastore. Consulte a seção sobre privilégios de gerenciamento no Unity Catalog.

      Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.

    • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

    • Você tem privilégios USE CATALOG no catálogo de destino.

    • Você tem privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

    • O senhor tem acesso a uma instância primária do SQL Server. Os recursos de acompanhamento de alterações e captura de dados de alterações (CDC) não são suportados em réplicas de leitura ou instâncias secundárias.

    • Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:

      • Família: Job compute

      • A família de políticas substitui:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
      • Databricks recomenda especificar o menor número possível de nós worker para gateways de ingestão porque eles não afetam o desempenho do gateway. A política compute a seguir permite que o Databricks dimensione o gateway de ingestão para atender às necessidades de sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração eficiente e eficiente de dados do seu banco de dados de origem.
      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "r5n.16xlarge"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "m5n.large"
      }
      }

      Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de compute.

  • Para importar dados do SQL Server, você deve primeiro concluir os passos descritos em Configurar Microsoft SQL Server para importação para o Databricks.

Crie um gateway e um pipeline de ingestão.

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em SQL Server .

  3. Na página Conexão do assistente de ingestão, selecione a conexão que armazena as credenciais de acesso do SQL Server em Configurar o Microsoft SQL Server para ingestão no Databricks. Se você tiver o privilégio CREATE CONNECTION no metastore, poderá clicar. Ícone de mais (+). Criar conexão para estabelecer uma nova conexão com os detalhes de autenticação no SQL Server.

  4. Clique em Avançar .

  5. Na página de configuração de ingestão , insira um nome exclusivo para o pipeline de ingestão. Este pipeline move dados do local de armazenamento temporário para o destino.

  6. Selecione um catálogo e um esquema para gravar logs de eventos. O log de eventos contém logs de auditoria, verificações de qualidade de dados, progresso pipeline e erros. Se você tiver privilégios USE CATALOG e CREATE SCHEMA no catálogo, poderá clicar. Ícone de mais (+). Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.

  7. (Opcional) Defina a refresh automática completa para todas as tabelas como Ativada . Quando refresh automática está ativada, o pipeline tenta corrigir automaticamente problemas como eventos de limpeza log e certos tipos de evolução do esquema, atualizando completamente a tabela afetada. Se a história acompanhamento estiver habilitada, uma refresh completa apagará essa história.

  8. Insira um nome exclusivo para o gateway de ingestão. O gateway é um pipeline que extrai as alterações da origem e as prepara para que o pipeline de ingestão as carregue.

  9. Selecione um catálogo e um esquema para o local de preparação . Neste local é criado um volume para estágio de remoção de dados. Se você tiver privilégios USE CATALOG e CREATE SCHEMA no catálogo, poderá clicar. Ícone de mais (+). Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.

  10. Clique em Create pipeline (Criar pipeline) e continue .

  11. Na página Origem , selecione as tabelas que deseja importar. Se você selecionar tabelas específicas, poderá configurar as definições da tabela:

    a. (Opcional) Na tab Configurações , especifique um nome de destino para cada tabela ingerida. Isso é útil para diferenciar entre tabelas de destino quando você ingere um objeto no mesmo esquema várias vezes. Consulte Nomear uma tabela de destino.

    um. (Opcional) Altere a configuração default da história acompanhamento . Consulte Habilitar história envio (SCD tipo 2).

  12. Clique em Avançar e, em seguida, clique em Salvar e continuar .

  13. Na página Destino , selecione um catálogo e um esquema para carregar os dados. Se você tiver privilégios USE CATALOG e CREATE SCHEMA no catálogo, poderá clicar. Ícone de mais (+). Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.

  14. Clique em Salvar e continuar .

  15. Na página de configuração do banco de dados , clique em Validar para confirmar se sua fonte está configurada corretamente para ingestão no Databricks. Quaisquer configurações ausentes serão retornadas. Para saber os passos para resolver, clique em Concluir configuração . Em seguida, clique em Avançar . Alternativamente, clique em Ignorar validação .

  16. (Opcional) Na página de programação e notificações , clique em Ícone de mais (+). Criar programar . Defina a frequência de refresh das tabelas de destino.

  17. (Opcional) Clique Ícone de mais (+). Adicione uma notificação para configurar notificações email para operações pipeline bem-sucedidas ou com falha e, em seguida, clique em Salvar e execute pipeline .

Verificar se a ingestão de dados foi bem-sucedida

A lista view na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

Verificar a replicação

As colunas Upserted records e Deleted records não são exibidas por default. Você pode ativá-las clicando no Ícone de configuração de colunas botão de configuração das colunas e selecionando-as.

Exemplos

Utilize esses exemplos para configurar seu pipeline.

Configuração do pipeline

O seguinte arquivo de definição de pipeline:

YAML
variables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema

resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}

pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}

Arquivo de definição de trabalho de pacote

Segue abaixo um exemplo de arquivo de definição de tarefa para uso com pacotes de automação declarativa. A execução do trabalho ocorre todos os dias, exatamente um dia após a última execução.

YAML
resources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job

trigger:
periodic:
interval: 1
unit: DAYS

email_notifications:
on_failure:
- <email-address>

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}

Padrões comuns

Para configurações avançadas pipeline , consulte Padrões comuns para gerenciar pipeline de ingestão.

Próximos passos

começar, programar e definir alerta em seu pipeline. Consulte Tarefa comum de manutenção pipeline.

Recurso adicional