Ingerir dados do SQL Server
Aprenda como importar dados do SQL Server para o Databricks usando LakeFlow Connect.
O conector SQL Server oferece suporte aos bancos de dados SQL Azure , à Instância de Gerenciamento Azure SQL e aos bancos de dados SQL Amazon RDS. Isso inclui o SQL Server executado em máquinas virtuais (VMs) do Azure e no Amazon EC2. O conector também oferece suporte SQL Server on-premises usando as redes Azure ExpressRoute e AWS Direct Connect.
Requisitos
-
Para criar um gateway de ingestão e um pipeline de ingestão, você deve primeiro atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para o seu workspace. Consulte os requisitos do compute sem servidor.
-
Se você planeja criar uma conexão: Você tem privilégios
CREATE CONNECTIONno metastore. Consulte a seção sobre privilégios de gerenciamento no Unity Catalog.Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.
-
Se você planeja usar uma conexão existente: Você tem privilégios
USE CONNECTIONouALL PRIVILEGESna conexão. -
Você tem privilégios
USE CATALOGno catálogo de destino. -
Você tem privilégios
USE SCHEMA,CREATE TABLEeCREATE VOLUMEem um esquema existente ou privilégiosCREATE SCHEMAno catálogo de destino. -
O senhor tem acesso a uma instância primária do SQL Server. Os recursos de acompanhamento de alterações e captura de dados de alterações (CDC) não são suportados em réplicas de leitura ou instâncias secundárias.
-
Permissões irrestritas para criar clusters ou uma política personalizada (somente API). Uma política personalizada para o gateway deve atender aos seguintes requisitos:
-
Família: Job compute
-
A família de políticas substitui:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
}- Databricks recomenda especificar o menor número possível de nós worker para gateways de ingestão porque eles não afetam o desempenho do gateway. A política compute a seguir permite que o Databricks dimensione o gateway de ingestão para atender às necessidades de sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração eficiente e eficiente de dados do seu banco de dados de origem.
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "r5n.16xlarge"
},
"node_type_id": {
"type": "fixed",
"value": "m5n.large"
}
}Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de compute.
-
-
-
Para importar dados do SQL Server, você deve primeiro concluir os passos descritos em Configurar Microsoft SQL Server para importação para o Databricks.
Crie um gateway e um pipeline de ingestão.
- Databricks UI
- Databricks Asset Bundles
- Databricks notebook
- Terraform
-
Na barra lateral do site Databricks workspace, clique em ingestão de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique em SQL Server .
-
Na página Conexão do assistente de ingestão, selecione a conexão que armazena as credenciais de acesso do SQL Server em Configurar o Microsoft SQL Server para ingestão no Databricks. Se você tiver o privilégio
CREATE CONNECTIONno metastore, poderá clicar.Criar conexão para estabelecer uma nova conexão com os detalhes de autenticação no SQL Server.
-
Clique em Avançar .
-
Na página de configuração de ingestão , insira um nome exclusivo para o pipeline de ingestão. Este pipeline move dados do local de armazenamento temporário para o destino.
-
Selecione um catálogo e um esquema para gravar logs de eventos. O log de eventos contém logs de auditoria, verificações de qualidade de dados, progresso pipeline e erros. Se você tiver privilégios
USE CATALOGeCREATE SCHEMAno catálogo, poderá clicar.Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.
-
(Opcional) Defina a refresh automática completa para todas as tabelas como Ativada . Quando refresh automática está ativada, o pipeline tenta corrigir automaticamente problemas como eventos de limpeza log e certos tipos de evolução do esquema, atualizando completamente a tabela afetada. Se a história acompanhamento estiver habilitada, uma refresh completa apagará essa história.
-
Insira um nome exclusivo para o gateway de ingestão. O gateway é um pipeline que extrai as alterações da origem e as prepara para que o pipeline de ingestão as carregue.
-
Selecione um catálogo e um esquema para o local de preparação . Neste local é criado um volume para estágio de remoção de dados. Se você tiver privilégios
USE CATALOGeCREATE SCHEMAno catálogo, poderá clicar.Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.
-
Clique em Create pipeline (Criar pipeline) e continue .
-
Na página Origem , selecione as tabelas que deseja importar. Se você selecionar tabelas específicas, poderá configurar as definições da tabela:
a. (Opcional) Na tab Configurações , especifique um nome de destino para cada tabela ingerida. Isso é útil para diferenciar entre tabelas de destino quando você ingere um objeto no mesmo esquema várias vezes. Consulte Nomear uma tabela de destino.
um. (Opcional) Altere a configuração default da história acompanhamento . Consulte Habilitar história envio (SCD tipo 2).
-
Clique em Avançar e, em seguida, clique em Salvar e continuar .
-
Na página Destino , selecione um catálogo e um esquema para carregar os dados. Se você tiver privilégios
USE CATALOGeCREATE SCHEMAno catálogo, poderá clicar.Para criar um novo esquema, clique em "Criar esquema" no menu suspenso.
-
Clique em Salvar e continuar .
-
Na página de configuração do banco de dados , clique em Validar para confirmar se sua fonte está configurada corretamente para ingestão no Databricks. Quaisquer configurações ausentes serão retornadas. Para saber os passos para resolver, clique em Concluir configuração . Em seguida, clique em Avançar . Alternativamente, clique em Ignorar validação .
-
(Opcional) Na página de programação e notificações , clique em
Criar programar . Defina a frequência de refresh das tabelas de destino.
-
(Opcional) Clique
Adicione uma notificação para configurar notificações email para operações pipeline bem-sucedidas ou com falha e, em seguida, clique em Salvar e execute pipeline .
Antes de iniciar a ingestão de dados usando Pacotes de Automação Declarativa, você precisa ter acesso a uma conexão existente. Para obter instruções, consulte Conectar-se às fontes de ingestão de gerenciamento.
O catálogo e o esquema de preparação podem ser os mesmos que o catálogo e o esquema de destino. O catálogo de encenação não pode ser um catálogo estrangeiro. Especifique o local de preparação na seção gateway_definition do seu arquivo YAML de pipeline de pacotes.
O gateway de ingestão extrai o Snapshot e altera os dados do banco de dados de origem e os armazena no volume de preparação Unity Catalog. O senhor deve executar o gateway como um pipeline contínuo. Isso ajuda a acomodar quaisquer políticas de retenção de log de alterações que o senhor tenha no banco de dados de origem.
A ingestão pipeline aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.
Os pacotes podem conter definições YAML de Job e tarefa, são gerenciados usando a CLI Databricks e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, teste e produção). Para mais informações, consulte O que são pacotes de automação declarativa?.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init -
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (por exemplo,
resources/sqlserver_pipeline.yml). Consulte pipeline.ingestion_definition e exemplos. - Um arquivo de definição de trabalho que controla a frequência de ingestão de dados (por exemplo,
resources/sqlserver_job.yml).
- Um arquivo de definição de pipeline (por exemplo,
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
Atualize a célula Configuration no Notebook a seguir com a conexão de origem, o catálogo de destino, o esquema de destino e as tabelas a serem ingeridas da origem.
Você pode usar Terraform para implantar e gerenciar um pipeline de ingestão SQL Server . Para um framework de exemplo completo, incluindo configurações Terraform para criação de gateways e pipeline de ingestão, consulte os exemplos Terraform LakeFlow Connect catalogados no GitHub.
Verificar se a ingestão de dados foi bem-sucedida
A lista view na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

As colunas Upserted records e Deleted records não são exibidas por default. Você pode ativá-las clicando no botão de configuração das colunas e selecionando-as.
Exemplos
Utilize esses exemplos para configurar seu pipeline.
Configuração do pipeline
- Databricks Asset Bundles
- Databricks notebook
O seguinte arquivo de definição de pipeline:
variables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
Segue abaixo um exemplo da seção Configuration de uma especificação de pipeline:
# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"
# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"
# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name
# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"
# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"
# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
# + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])
Arquivo de definição de trabalho de pacote
Segue abaixo um exemplo de arquivo de definição de tarefa para uso com pacotes de automação declarativa. A execução do trabalho ocorre todos os dias, exatamente um dia após a última execução.
resources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
Padrões comuns
Para configurações avançadas pipeline , consulte Padrões comuns para gerenciar pipeline de ingestão.
Próximos passos
começar, programar e definir alerta em seu pipeline. Consulte Tarefa comum de manutenção pipeline.